Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Chesnay Schepler

This issues sounds strikingly similar to FLINK-6965.

TL;DR: You must place classes loaded during serialization by the kafka 
connector under /lib.


On 29.11.2017 16:15, Timo Walther wrote:

Hi Bart,

usually, this error means that your Maven project configuration is not 
correct. Is your custom class included in the jar file that you submit 
to the cluster?


It might make sense to share your pom.xml with us.

Regards,
Timo



Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:

I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

 val kafkaConsumerProps = new Properties()
 kafkaConsumerProps.setProperty("bootstrap.servers", 
kafka_bootstrap)

kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
 KafkaRaeEventSerializer(schemaBaseDirectory),
   kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart








Re: Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Timo Walther

Hi Bart,

usually, this error means that your Maven project configuration is not 
correct. Is your custom class included in the jar file that you submit 
to the cluster?


It might make sense to share your pom.xml with us.

Regards,
Timo



Am 11/29/17 um 2:44 PM schrieb Bart Kastermans:

I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

 val kafkaConsumerProps = new Properties()
 kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)
 
kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 
kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
 val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
 KafkaRaeEventSerializer(schemaBaseDirectory),
   kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart





Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Bart Kastermans
I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

val kafkaConsumerProps = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)

kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")

kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
KafkaRaeEventSerializer(schemaBaseDirectory),
  kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart