测试代码如下:
--------------------------
public class Sink_KafkaSink_1{
    public static void main(String[] args) throws Exception {
        final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
        String host = params.get("host");
        int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
        produceTestdata2kafka(new
StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
    }

    private static void produceTestdata2kafka(String kafkaAddr) throws
Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
       
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        DataStreamSource<String> text = env.addSource(new
CustomsourceFuncation()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",kafkaAddr);

        FlinkKafkaProducer producer = new
FlinkKafkaProducer("flinktest",//topic
                new SimpleStringSchema(), //消息序列化
                properties
        );
        //写入 Kafka 时附加记录的事件时间戳
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("[kafkaSink with custom source]");
    }
}

class CustomsourceFuncation implements SourceFunction<String> {
    //private long count = 1L;
    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("msg1");
            books.add("msg2");
            books.add("msg3");
            books.add("msg4");
            books.add("msg5");
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            //每2秒产生一条数据
            Thread.sleep(2000);
        }
    }

    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}
------------------------------------------

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
----------------------------------
2021-01-22 07:54:31,929 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_000002 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:77)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[quickstart-0.1.jar:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of org.apache.kafka.common.serialization.Serializer
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
~[quickstart-0.1.jar:?]
        ... 23 more
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-01-22 07:54:32,971 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:33,973 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
----------------------------------


flink 1.12.1版本,试着用per-job mode 部署是ok的,在flinktest 这个topic能正常消费到数据



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复