能否看一下对应的包里是否有这个类, 方法如下(假设你的blink安装包在 /tmp/blink):

cd /tmp/blink/opt/connectors/kafka011
jar -tf flink-connector-kafka-0.11_2.11-*.jar | grep Crc32C

On Fri, Feb 22, 2019 at 2:56 PM 张洪涛 <hongtao12...@163.com> wrote:

>
>
> 大家好!
>
>
> 我正在测试Blink sql client kafka sink connector ,但是发现写入失败,以下是我的步骤
>
>
> 环境配置
> blink standalone 模式
>
>
>
>
> 1. 配置environment 启动sql client
>
>
> 2. 创建kafka sink table
> CREATETABLEkafka_sink(
>    messageKeyVARBINARY,
>    messageValueVARBINARY,
>    PRIMARYKEY(messageKey))
> with(
>    type='KAFKA011',
>    topic='sink-topic',
>    `bootstrap.servers`='172.19.0.108:9092',
>    retries='3'
> );
>
>
> 3. 创建查询语句并执行
> INSERT INTO kafka_sink
> SELECT CAST('123' AS VARBINARY) AS key,
> CAST(CONCAT_WS(',', 'HELLO', 'WORLD') AS VARBINARY) AS msg;
>
>
>
>
> 错误日志(from task executor log)
>
>
> 主要是找不到kafka common package下面的一个类, 但是启动sql client 时候已经把kafka connector
> 相关的jar包包括在内 在提交job时候 也会把这些jars 和 jobgraph一并上传到cluster,理论上这些class都会被加载
>
>
>
>
>
>
> 2019-02-22 14:37:18,356 ERROR
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread
> - Uncaught exception in kafka-producer-network-thread | producer-1:
> java.lang.NoClassDefFoundError:
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/utils/Crc32C
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.DefaultRecordBatch.writeHeader(DefaultRecordBatch.java:468)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.writeDefaultBatchHeader(MemoryRecordsBuilder.java:339)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:293)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:391)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:485)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:254)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:223)
>         at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.Crc32C
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
>
>
> --
>   Best Regards
>   Hongtao
>
>

-- 
若批評無自由,則讚美無意義!

回复