Hi John, this depends on your checkpoint interval. When enabled checkpoints are triggered periodically [1].
Cheers, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com> wrote: > Ok so just to be clear. Let's say we started at day 0... > > 1- Producer inserted 10 records into Kafka. > 2- Kafka Flink Consumer consumed 5 records. > 3- Some transformations applied to those records. > 4- 4 records sinked, 1 failed. > 5- Flink Job restarts because of above failure. > > When does the checkpoint happen above? > And does it mean in the above case that it will start back at 0 or will it > start at the 4th record and continue or where ever the checkpoint happend. > Example 3rd record? > My stored proc will be idempotent and I understand if messages get > replayed what to do. > Just want to try to understand when and where the checkpointing will > happen. > > On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote: > >> Hi John, >> >> I think what Konstantin is trying to say is: Flink's Kafka consumer does >> not start consuming from the Kafka commit offset when starting the >> consumer, it would actually start with the offset that's last checkpointed >> to external DFS. (e.g. the starting point of the consumer has no relevance >> with Kafka committed offset whatsoever - if checkpoint is enabled.) >> >> This is to quote: >> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a >> best-effort basis after every checkpoint. Internally Flink "commits" the >> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.* >> " >> >> However if you do not enable checkpointing, I think your consumer will >> by-default restart from the default kafka offset (which I think is your >> committed group offset). >> >> -- >> Rong >> >> >> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> wrote: >> >>> So when we say a sink is at least once. It's because internally it's not >>> checking any kind of state and it sends what it has regardless, correct? >>> Cause I willl build a sink that calls stored procedures. >>> >>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < >>> konstan...@ververica.com> wrote: >>> >>>> Hi John, >>>> >>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >>>> restarted from the last checkpoint. This means the offset of all Kafka >>>> partitions will be reset to that point in the stream along with state of >>>> all operators. To enable checkpointing you need to call >>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the >>>> JDBCSinkFunction (which is an at-least-once sink), the output will be >>>> duplicated in the case of failures. >>>> >>>> To answer your questions: >>>> >>>> * For this the FlinkKafkaConsumer handles the offsets manually (no >>>> auto-commit). >>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka >>>> on a best-effort basis after every checkpoint. Internally Flink "commits" >>>> the checkpoints as part of its periodic checkpoints. >>>> * Yes, along with all other events between the last checkpoint and the >>>> failure. >>>> * It will continue from the last checkpoint. >>>> >>>> Hope this helps. >>>> >>>> Cheers, >>>> >>>> Konstantin >>>> >>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> Hi using Apache Flink 1.8.0 >>>>> >>>>> I'm consuming events from Kafka using nothing fancy... >>>>> >>>>> Properties props = new Properties(); >>>>> props.setProperty("bootstrap.servers", kafkaAddress); >>>>> props.setProperty("group.id",kafkaGroup); >>>>> >>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new >>>>> SimpleStringSchema(),props); >>>>> >>>>> >>>>> Do some JSON transforms and then push to my SQL database using JDBC >>>>> and stored procedure. Let's assume the SQL sink fails. >>>>> >>>>> We know that Kafka can either periodically commit offsets or it can be >>>>> done manually based on consumers logic. >>>>> >>>>> - How is the source Kafka consumer offsets handled? >>>>> - Does the Flink Kafka consumer commit the offset to per event/record? >>>>> - Will that single event that failed be retried? >>>>> - So if we had 5 incoming events and say on the 3rd one it failed, >>>>> will it continue on the 3rd or will the job restart and try those 5 >>>>> events. >>>>> >>>>> >>>>> >>>> >>>> -- >>>> >>>> Konstantin Knauf | Solutions Architect >>>> >>>> +49 160 91394525 >>>> >>>> >>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>>> >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>> >>> -- Konstantin Knauf | Solutions Architect +49 160 91394525 Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen