I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

    val kafkaConsumerProps = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)
    
kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")
    
kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
    val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
    KafkaRaeEventSerializer(schemaBaseDirectory),
      kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart

Reply via email to