flink 1.12.1 kafka 流批一体

流批一体

flink 1.12发布后,在api上真正做到了流批一体。在替换kafka consumer到kafka source,碰到很多坑,好不容易解决了,记录下。

不是最佳的方案,等着1.13发布修正。

坑1:使用官方镜像

1.12.1自己构建镜像会出现权限问题,可能是docker版本的问题,使用官方镜像后可以解决。

坑2:使用kafka source后kafka internal auth需要是plaintext

我们的集群使用的是SASL_PLAINTEXT的auth,在配置sasl后,程序无法运行,在深入代码之后,发现Enumerator内的consumer和admin
client没有配置sasl,导致无法客户端无法连接至server。

只能使用plaintext auth的kafka集群,也就是没有auth,代码如下。

1
2
3
4
5
6
7
8
9
KafkaSource
.builder[JsValue]()
.setBootstrapServers(bootstrapServers)
.setGroupId(groupId)
.setTopics(topic)
.setDeserializer(KafkaRecordDeserializer.valueOnly(classOf[JsValueDeserializer]))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setBounded(OffsetsInitializer.latest())
.build()

坑3:javax.management.InstanceAlreadyExistsException

无视,这个是kafka的老bug,因为相同的client id注册两次到JMX,可以把这个exception屏蔽掉。

坑4:topic和partition都不能为空

否则会出现各种错误,比如checkpoint失败,无法提交offset,无法消费等。

坑5:并行度与分区数要相同

不相同的话,任务无法结束。