Re: Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-20 Thread Bart Kastermans
Thanks for the reply; is there a flip for this?

- bart

On Mon, Feb 19, 2018, at 5:50 PM, Till Rohrmann wrote:
> Hi Bart,
> 
> you're right that Flink currently does not support a graceful stop
> mechanism for the Kafka source. The community has already a good
> idea how to solve it in the general case and will hopefully soon add
> it to Flink.> 
> Concerning the StoppableFunction: This interface was introduced quite
> some time ago and currently only works for some batch sources. In
> order to make it work with streaming, we need to add some more
> functionality to the engine in order to properly stop and take a
> savepoint.> 
> Cheers,
> Till
> 
> On Mon, Feb 19, 2018 at 3:36 PM, Bart Kastermans
> <fl...@kasterma.net> wrote:>> In
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html
>> it is shown that>>  for gracefully stopping a job you need to implement the
>>  StoppableFunction interface.  This>>  appears not (yet) implemented for 
>> Kafka consumers.  Am I missing
>>  something, or is there a>>  different way to gracefully stop a job using a 
>> kafka source so we
>>  can restart it later without>>  losing any (in flight) events?
>> 
>>  - bart



Stopping a kafka consumer gracefully (no losing of inflight events, StoppableFunction)

2018-02-19 Thread Bart Kastermans
In https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html it 
is shown that
for gracefully stopping a job you need to implement the StoppableFunction 
interface.  This
appears not (yet) implemented for Kafka consumers.  Am I missing something, or 
is there a
different way to gracefully stop a job using a kafka source so we can restart 
it later without
losing any (in flight) events?

- bart


Classpath for execution of KafkaSerializer/Deserializer; java.lang.NoClassDefFoundError while class in job jar

2017-11-29 Thread Bart Kastermans
I have a custom serializer for writing/reading from kafka.  I am setting
this up in main with code as follows:

val kafkaConsumerProps = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", kafka_bootstrap)

kafkaConsumerProps.setProperty("group.id",s"normalize-call-events-${scala.util.Random.nextInt}")

kafkaConsumerProps.setProperty("client.id",s"normalize-call-events-${scala.util.Random.nextInt}")
val source = new FlinkKafkaConsumer010[RaeEvent](sourceTopic, new
KafkaRaeEventSerializer(schemaBaseDirectory),
  kafkaConsumerProps)

This generates java.lang.NoClassDefFoundError on classes that are
in my job jar.  Printing the classpath doesn't show the libraries
explicitly (but these are also not shown explicitly in place where they
are found; I guess the current jar is now shown on the classpath).  I
don't know how to list the current classloaders.

Also, the error goes away when I add the dependency to /flink/lib and
restart flink.  Hence my conjecture that in the kafka
serializer/deserializer context the depenencies from my job jar are
not available.

Flink version 1.2.0

Any help greatly appreciated; also I'll be happy to provide additional
info.

Also greatly appreciated where I should have looked in the flink code to
decide the answer myself.

- bart


Database connection from job

2017-08-24 Thread Bart Kastermans
I am using the scala api for Flink, and am trying to set up a JDBC
database connection
in my job (on every incoming event I want to query the database to get
some data
to enrich the event).  Because of the serialization and deserialization
of the code as
it is send from the flink master to the flink workers I cannot just open
the connection
in my main method.  Can someone give me a pointer to the lifecycle
methods that
are called by the worker to do local initialization of the job?  I have
not yet been able
to find any references or examples of this in the documentation.

Thanks!

Best,
Bart