Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Arvid Heise
Hi Debraj,

such errors are usually caused by having two versions of Kafka in different
places. Did you put flink-connector-kafka into lib/, plugin/, or
specifically pointed yarn to it in some way?

You should only include it into your user jar. The user jar should not
reside in any of the aforementioned places as well.

Best,

Arvid

On Tue, Aug 24, 2021 at 2:02 PM Debraj Manna 
wrote:

> Fabian
>
> I am running it inside yarn.
>
> Thanks,
>
> On Tue, Aug 24, 2021 at 5:27 PM Fabian Paul 
> wrote:
>
>> Hi Debraj
>>
>> How do you run your application? If you run it from an IDE you can set a
>> breakpoint and inspect the serializer class which is used.
>>
>> Best,
>> Fabian
>
>


Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
Fabian

I am running it inside yarn.

Thanks,

On Tue, Aug 24, 2021 at 5:27 PM Fabian Paul 
wrote:

> Hi Debraj
>
> How do you run your application? If you run it from an IDE you can set a
> breakpoint and inspect the serializer class which is used.
>
> Best,
> Fabian


Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Fabian Paul
Hi Debraj

How do you run your application? If you run it from an IDE you can set a 
breakpoint and inspect the serializer class which is used.

Best,
Fabian

Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
yes I initially did not add
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG` or
`ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
I was getting the same error so tried setting them explicitly.

I did mvn dependency:tree | grep -i kafka. I did not see any other versions
of Kafka in non test dependency and this error I am not getting during test
but only when I am running my flink application.


[INFO] +- org.apache.flink:flink-connector-kafka_2.12:jar:1.13.1:compile
[INFO] |  +- org.apache.kafka:kafka-clients:jar:2.4.1:compile
[INFO] +- org.apache.kafka:kafka_2.12:jar:2.4.1:test
[INFO] +- org.apache.flink:flink-connector-kafka_2.12:test-jar:tests:1.13.1:test
[INFO] +- net.mguenther.kafka:kafka-junit:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-api:jar:2.4.0:test
[INFO] |  +- org.apache.kafka:connect-json:jar:2.4.0:test
[INFO] |  \- org.apache.kafka:connect-runtime:jar:2.4.0:test
[INFO] | +- org.apache.kafka:kafka-tools:jar:2.4.0:test
[INFO] | |  +- org.apache.kafka:kafka-log4j-appender:jar:2.4.0:test
[INFO] | +- org.apache.kafka:connect-transforms:jar:2.4.0:test


On Tue, Aug 24, 2021 at 5:08 PM Fabian Paul 
wrote:

> Hi Debraj,
>
> The error looks indeed strange. We recommend to not set any 
> `ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG`
> or `ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG`
> because the connector will take care of it. Can you try to remove these
> call and check if it makes a difference?
>
> Only looking at the error message it feels like different versions of
> the Kafka dependency are on the class path.
>
> Best,
> Fabian
>


Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Fabian Paul
Hi Debraj,

The error looks indeed strange. We recommend to not set any 
`ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG` or 
`ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG` 
because the connector will take care of it. Can you try to remove these call 
and check if it makes a difference?

Only looking at the error message it feels like different versions of the Kafka 
dependency are on the class path.

Best,
Fabian

Re: Flink 1.13.1 Kafka Producer Error

2021-08-24 Thread Debraj Manna
The same query has been asked in stackoverflow

also. Another related question

on Stackoverflow. Does anyone have any suggestions?

On Mon, Aug 23, 2021 at 9:07 PM Debraj Manna 
wrote:

> I am trying to use flink kafka producer like below
>
> public static FlinkKafkaProducer createProducer()
> {
> Properties props = new Properties();
> props.setProperty("bootstrap.servers", "");
> props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
> props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> ByteArraySerializer.class.getName());
>
> return new FlinkKafkaProducer<>(
> "FlinkSdmKafkaTopic",
> new SerializationSchema("FlinkSdmKafkaTopic", 8),
> props,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> }
>
> private static class SerializationSchema implements 
> KafkaSerializationSchema {
> final String topic;
> final int numPartitions;
>
> public SerializationSchema(final String topic, final int numPartitions) {
> this.topic = topic;
> this.numPartitions = numPartitions;
> }
>
> @Override
> public ProducerRecord serialize(SelfDescribingMessageDO 
> sdm, @Nullable Long aLong) {
> return new ProducerRecord<>(topic,
> KafkaPublisher.getPartitionId(sdm.getHashKey(), 
> numPartitions),
> sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
> sdm.toByteArray());
> }
> }
>
> I am getting the below exception when trying to deploy the flink job. During 
> unit tests I am not getting this error.
>
> 2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source: MetricSource 
> -> Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map -> 
> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> transitionState:1069 Source: MetricSource -> Filter -> MetricStoreMapper -> 
> (Filter -> Timestamps/Watermarks -> Map -> Flat Map, Sink: 
> FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0 
> (5764a387ede7d6710bcf3ad4e248) switched from INITIALIZING to FAILED with 
> failure cause: org.apache.kafka.common.KafkaException: Failed to construct 
> kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
> at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at 
> org.apache.flink.streaming.runtime.tasks.

Flink 1.13.1 Kafka Producer Error

2021-08-23 Thread Debraj Manna
I am trying to use flink kafka producer like below

public static FlinkKafkaProducer createProducer() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());

return new FlinkKafkaProducer<>(
"FlinkSdmKafkaTopic",
new SerializationSchema("FlinkSdmKafkaTopic", 8),
props,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}

private static class SerializationSchema implements
KafkaSerializationSchema {
final String topic;
final int numPartitions;

public SerializationSchema(final String topic, final int numPartitions) {
this.topic = topic;
this.numPartitions = numPartitions;
}

@Override
public ProducerRecord
serialize(SelfDescribingMessageDO sdm, @Nullable Long aLong) {
return new ProducerRecord<>(topic,
KafkaPublisher.getPartitionId(sdm.getHashKey(), numPartitions),
sdm.getHashKey().getBytes(StandardCharsets.UTF_8),
sdm.toByteArray());
}
}

I am getting the below exception when trying to deploy the flink job.
During unit tests I am not getting this error.

2021-08-23T14:47:55.504Z WARN runtime.taskmanager.Task Source:
MetricSource -> Filter -> MetricStoreMapper -> (Filter ->
Timestamps/Watermarks -> Map -> Flat Map, Sink: FlinkKafkaProducer11,
Sink: TSDBSink14) (5/8)#0 transitionState:1069 Source: MetricSource ->
Filter -> MetricStoreMapper -> (Filter -> Timestamps/Watermarks -> Map
-> Flat Map, Sink: FlinkKafkaProducer11, Sink: TSDBSink14) (5/8)#0
(5764a387ede7d6710bcf3ad4e248) switched from INITIALIZING to
FAILED with failure cause: org.apache.kafka.common.KafkaException:
Failed to construct kafka producer
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.S