hi,这个是依赖的问题。如果集群flink/lib下已经有了flink-connector-kafka.jar,提交的任务pom里面就要provider一下
在 2021-01-22 16:14:17,"lp" <973182...@qq.com> 写道: >测试代码如下: >-------------------------- >public class Sink_KafkaSink_1{ > public static void main(String[] args) throws Exception { > final ParameterTool params = >ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); > String host = params.get("host"); > int kafkaPort = Integer.parseInt(params.get("kafkaPort")); > produceTestdata2kafka(new >StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString()); > } > > private static void produceTestdata2kafka(String kafkaAddr) throws >Exception { > StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000); > >env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > DataStreamSource<String> text = env.addSource(new >CustomsourceFuncation()).setParallelism(1); > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers",kafkaAddr); > > FlinkKafkaProducer producer = new >FlinkKafkaProducer("flinktest",//topic > new SimpleStringSchema(), //消息序列化 > properties > ); > //写入 Kafka 时附加记录的事件时间戳 > producer.setWriteTimestampToKafka(true); > text.addSink(producer); > env.execute("[kafkaSink with custom source]"); > } >} > >class CustomsourceFuncation implements SourceFunction<String> { > //private long count = 1L; > private boolean isRunning = true; > > @Override > public void run(SourceContext<String> ctx) throws Exception { > while(isRunning){ > //图书的排行榜 > List<String> books = new ArrayList<>(); > books.add("msg1"); > books.add("msg2"); > books.add("msg3"); > books.add("msg4"); > books.add("msg5"); > int i = new Random().nextInt(5); > ctx.collect(books.get(i)); > //每2秒产生一条数据 > Thread.sleep(2000); > } > } > > //取消一个cancel的时候会调用的方法 > @Override > public void cancel() { > isRunning = false; > } >} >------------------------------------------ > >本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下: >---------------------------------- >2021-01-22 07:54:31,929 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RUNNING to RESTARTING. >2021-01-22 07:54:32,930 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RESTARTING to RUNNING. >2021-01-22 07:54:32,931 INFO >org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No >checkpoint found during restore. >2021-01-22 07:54:32,931 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from CREATED to SCHEDULED. >2021-01-22 07:54:32,932 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from SCHEDULED to DEPLOYING. >2021-01-22 07:54:32,932 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying >Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id >ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @ >slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2 >2021-01-22 07:54:32,950 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from DEPLOYING to RUNNING. >2021-01-22 07:54:32,969 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: >Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28) >switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @ >slave02 (dataPort=37913). >org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >~[quickstart-0.1.jar:?] > at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >~[quickstart-0.1.jar:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >~[quickstart-0.1.jar:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >~[quickstart-0.1.jar:?] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231] >Caused by: org.apache.kafka.common.KafkaException: class >org.apache.kafka.common.serialization.ByteArraySerializer is not an instance >of org.apache.kafka.common.serialization.Serializer > at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) >~[quickstart-0.1.jar:?] > at >org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) >~[quickstart-0.1.jar:?] > ... 23 more >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >[] - Calculating tasks to restart to recover the failed task >cbc357ccb763df2852fee8c4fc7d55f2_0. >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy >[] - 1 tasks should be restarted to recover the failed task >cbc357ccb763df2852fee8c4fc7d55f2_0. >2021-01-22 07:54:32,971 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RUNNING to RESTARTING. >2021-01-22 07:54:33,973 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job >[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched >from state RESTARTING to RUNNING. >---------------------------------- > > >flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/