flink on k8s

项目背景

流式处理(主ETL 副计算)

0.1版本

直接在一台机器上启动standalone cluster,通过web提交任务。

缺点:所有计算资源依赖于一台机器,此方面只能做为初期的demo部署方案。

0.2版本(没有实施过,只是初期的时候想过)

部署standalone cluster集群版本,通过web提交任务。

缺点:不是HA

0.3版本(没有实施过,只是初期的时候想过)

HA standalone cluster,通过web提交任务。

缺点:这个其实就满足生产的要求,需要额外的部署zk集群。但是没有上k8s,不满足当前潮流。

1.0版本

k8s standalone cluster,配置用的是官网的。

缺点:官网的配置根本不是HA版本,而且checkpoint只能用内存版本,很容易OOM。

1.1版本

k8s job cluster版本,在编译镜像的时候将jar包打在镜像里面

缺点:现在确实是HA了,但是checkpoint的问题没有解决,非常容易oom。

1.2版本

k8s job cluster版本加阿里云nas存储(线上没hadoop环境,没有hdfs,也不想用oss)。

缺点:现在是HA,也解决了OOM的问题,tm挂了,也不会影响job。但是jm挂了,就会影响job,因为job id飘了。

2.0版本

k8s job cluster版本加阿里云nas存储(线上没hadoop环境,没有hdfs,也不想用oss)加固定job

缺点:目前线上还是跑的很欢乐的。暂时在部署上应该没什么问题了。

2.1版本

2.0版本加上静态pv pvc

最后附上k8s代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-nas-pv
spec:
capacity:
storage: 100Gi
volumeMode: Filesystem
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: nas-flink-rel
mountOptions:
- nolock,tcp,noresvport
- nfsvers=4
nfs:
path: /
server: xxx
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-rel-pvc
namespace: nse
spec:
accessModes:
- ReadWriteMany
volumeMode: Filesystem
resources:
requests:
storage: 100Gi
storageClassName: nas-flink-rel
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-job-cluster
labels:
app: flink
namespace: xxx
spec:
replicas: 1
selector:
matchLabels:
app: flink
template:
metadata:
labels:
app: flink
component: job-cluster
spec:
imagePullSecrets:
- name: nse-docker-vpc
restartPolicy: Always
containers:
- name: flink-job-cluster
image: xxx
envFrom:
- configMapRef:
name: flink-env
resources:
requests:
memory: "4Gi"
limits:
memory: "5Gi"
args:
- "job-cluster"
- "-Djobmanager.rpc.address=flink-job-cluster"
- "-Dparallelism.default=6"
- "-Dblob.server.port=6124"
- "-Dqueryable-state.server.ports=6125"
- "--job-id"
- "00000000000000000000000000000000"
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
env:
- name: FLINK_JM_HEAP
value: "4096m"
readinessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 30
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 30
volumeMounts:
- name: flink-log
mountPath: /opt/flink/log/
- name: flink-checkpoint
mountPath: /data/flink/checkpoints
volumes:
- name: flink-log
hostPath:
path: /var/log/flink/
- name: flink-checkpoint
persistentVolumeClaim:
claimName: flink-rel-pvc
---
apiVersion: v1
kind: Service
metadata:
name: flink-job-cluster
labels:
app: flink
component: job-cluster
namespace: xxx
spec:
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
nodePort: 30881
type: NodePort
selector:
app: flink
component: job-cluster
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-task-manager
namespace: xxx
labels:
app: flink
spec:
replicas: 6
selector:
matchLabels:
app: flink
template:
metadata:
labels:
app: flink
component: task-manager
spec:
imagePullSecrets:
- name: xxx
restartPolicy: Always
containers:
- name: flink-task-manager
image: xxx
envFrom:
- configMapRef:
name: flink-env
resources:
requests:
memory: "3Gi"
limits:
memory: "4Gi"
args:
- "task-manager"
- "-Djobmanager.rpc.address=flink-job-cluster"
env:
- name: FLINK_TM_HEAP
value: "3072m"
volumeMounts:
- name: flink-log
mountPath: /opt/flink/log/
- name: flink-checkpoint
mountPath: /data/flink/checkpoints
volumes:
- name: flink-log
hostPath:
path: /var/log/flink
- name: flink-checkpoint
persistentVolumeClaim:
claimName: flink-rel-pvc

具体的dockerfile,请谷歌flink job cluster github,拷贝对应的文件。