Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)
Thanks for the reply; is there a flip for this? - bart On Mon, Feb 19, 2018, at 5:50 PM, Till Rohrmann wrote: > Hi Bart, > > you're right that Flink currently does not support a graceful stop > mechanism for the Kafka source. The community has already a good > idea how to solve it in the general case and will hopefully soon add > it to Flink.> > Concerning the StoppableFunction: This interface was introduced quite > some time ago and currently only works for some batch sources. In > order to make it work with streaming, we need to add some more > functionality to the engine in order to properly stop and take a > savepoint.> > Cheers, > Till > > On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans > <fl...@kasterma.net> wrote:>> In >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html >> it is shown that>> for gracefully stopping a job you need to implement the >> StoppableFunction interface. This>> appears not (yet) implemented for >> Kafka consumers. Am I missing >> something, or is there a>> different way to gracefully stop a job using a >> kafka source so we >> can restart it later without>> losing any (in flight) events? >> >> - bart
Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)
In https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html it is shown that for gracefully stopping a job you need to implement the StoppableFunction interface. This appears not (yet) implemented for Kafka consumers. Am I missing something, or is there a different way to gracefully stop a job using a kafka source so we can restart it later without losing any (in flight) events? - bart
Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar
I have a custom serializer for writing/reading from kafka. I am setting this up in main with code as follows: val kafkaConsumerProps = new Properties() kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap) kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}") kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}") val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new KafkaRaeEventSerializer(schemaBaseDirectory), kafkaConsumerProps) This generates java.lang.NoClassDefFoundError on classes that are in my job jar. Printing the classpath doesn't show the libraries explicitly (but these are also not shown explicitly in place where they are found; I guess the current jar is now shown on the classpath). I don't know how to list the current classloaders. Also, the error goes away when I add the dependency to /flink/lib and restart flink. Hence my conjecture that in the kafka serializer/deserializer context the depenencies from my job jar are not available. Flink version 1.2.0 Any help greatly appreciated; also I'll be happy to provide additional info. Also greatly appreciated where I should have looked in the flink code to decide the answer myself. - bart
Database connection from job
I am using the scala api for Flink, and am trying to set up a JDBC database connection in my job (on every incoming event I want to query the database to get some data to enrich the event). Because of the serialization and deserialization of the code as it is send from the flink master to the flink workers I cannot just open the connection in my main method. Can someone give me a pointer to the lifecycle methods that are called by the worker to do local initialization of the job? I have not yet been able to find any references or examples of this in the documentation. Thanks! Best, Bart