测试代码如下: -------------------------- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host = params.get("host"); int kafkaPort = Integer.parseInt(params.get("kafkaPort")); produceTestdata2kafka(new StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString()); }
private static void produceTestdata2kafka(String kafkaAddr) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DataStreamSource<String> text = env.addSource(new CustomsourceFuncation()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); FlinkKafkaProducer producer = new FlinkKafkaProducer("flinktest",//topic new SimpleStringSchema(), //消息序列化 properties ); //写入 Kafka 时附加记录的事件时间戳 producer.setWriteTimestampToKafka(true); text.addSink(producer); env.execute("[kafkaSink with custom source]"); } } class CustomsourceFuncation implements SourceFunction<String> { //private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext<String> ctx) throws Exception { while(isRunning){ //图书的排行榜 List<String> books = new ArrayList<>(); books.add("msg1"); books.add("msg2"); books.add("msg3"); books.add("msg4"); books.add("msg5"); int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒产生一条数据 Thread.sleep(2000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } ------------------------------------------ 本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下: ---------------------------------- 2021-01-22 07:54:31,929 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING. 2021-01-22 07:54:32,930 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RESTARTING to RUNNING. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore. 2021-01-22 07:54:32,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from CREATED to SCHEDULED. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from SCHEDULED to DEPLOYING. 2021-01-22 07:54:32,932 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @ slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2 2021-01-22 07:54:32,950 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from DEPLOYING to RUNNING. 2021-01-22 07:54:32,969 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @ slave02 (dataPort=37913). org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[quickstart-0.1.jar:?] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231] Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[quickstart-0.1.jar:?] at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[quickstart-0.1.jar:?] ... 23 more 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-22 07:54:32,971 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RUNNING to RESTARTING. 2021-01-22 07:54:33,973 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job [kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched from state RESTARTING to RUNNING. ---------------------------------- flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据 -- Sent from: http://apache-flink.147419.n8.nabble.com/