能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):
cd /tmp/blink/opt/connectors/kafka011 jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C On Fri, Feb 22, 2019 at 2:56 PM 张洪涛 <hongtao12...@163.com> wrote: > > > 大家好! > > > 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤 > > > 环境配置 > blink standalone 模式 > > > > > 1. 配置environment 启动sql client > > > 2. 创建kafka sink table > CREATETABLEkafka_sink( > messageKeyVARBINARY, > messageValueVARBINARY, > PRIMARYKEY(messageKey)) > with( > type='KAFKA011', > topic='sink-topic', > `bootstrap.servers`='172.19.0.108:9092', > retries='3' > ); > > > 3. 创建查询语句并执行 > INSERT INTO kafka_sink > SELECT CAST('123' AS VARBINARY) AS key, > CAST(CONCAT_WS(',', 'HELLO', 'WORLD') AS VARBINARY) AS msg; > > > > > 错误日志(from task executor log) > > > 主要是找不到kafka common package下面的一个类, 但是启动sql client 时候已经把kafka connector > 相关的jar包包括在内 在提交job时候 也会把这些jars 和 jobgraph一并上传到cluster,理论上这些class都会被加载 > > > > > > > 2019-02-22 14:37:18,356 ERROR > org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread > - Uncaught exception in kafka-producer-network-thread | producer-1: > java.lang.NoClassDefFoundError: > org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C > at > org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:339) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:293) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:391) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:485) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:254) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223) > at > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.Crc32C > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > > > > -- > Best Regards > Hongtao > > -- 若批評無自由,則讚美無意義!