Hi Vasily, as far as I know, by default console-consumer reads uncommited. Try setting isolation.level to read_committed in console-consumer properties.
Hope this helps, Maxim. On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik < vasily.mel...@glowbyteconsulting.com> wrote: > Hi, Eduardo. > Maybe i should describe experiment design precisely : > 1) I run Flink on YARN (YARN Session method). > 2) I do not stop/cancell application, i just kill TaskManager process > 3) After that YARN creates another TaskManager Process and auto checkpoint > restore from HDFS happens. > > That's why i expect to see correct restore. > > С уважением, > Василий Мельник > > *Glow**Byte Consulting* <http://www.gbconsulting.ru/> > > =================== > > Моб. тел.: +7 (903) 101-43-71 > vasily.mel...@glowbyteconsulting.com > > > On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor < > eduardo.winpe...@gmail.com> wrote: > >> Hi Vasily, >> >> You're probably executing this from your IDE or from a local Flink >> cluster without starting your job from a checkpoint. >> >> When you start your Flink job for the second time you need to specify the >> path to the latest checkpoint as an argument, otherwise Flink will start >> from scratch. >> >> You're probably thinking that's not great, ideally Flink should be able >> to automatically continue from the last produced checkpoint, and actually >> that's what the docs say! Well, that's only when you're running in a proper >> cluster environment. Flink is able to recover using checkpoints when only >> part of the cluster fails, not when the whole job is stopped. For full >> stops you need to specify the checkpoint manually. >> >> Hope that helps! >> >> >> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, < >> vasily.mel...@glowbyteconsulting.com> wrote: >> >>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source >>> and Sink: >>> >>> 1. Run flink app, simply transferring messages from one topic to >>> another with parallelism=1, checkpoint interval 20 seconds >>> 2. Generate messages with incrementing integer numbers using Python >>> script each 2 seconds. >>> 3. Read output topic with console consumer in read_committed >>> isolation level. >>> 4. Manually kill TaskManager >>> >>> I expect to see monotonically increasing integers in output topic >>> regardless TaskManager killing and recovery. >>> >>> But actually a see something unexpected in console-consumer output: >>> >>> 32 >>> 33 >>> 34 >>> 35 >>> 36 >>> 37 >>> 38 >>> 39 >>> 40 >>> -- TaskManager Killed >>> 32 >>> 34 >>> 35 >>> 36 >>> 40 >>> 41 >>> 46 >>> 31 >>> 33 >>> 37 >>> 38 >>> 39 >>> 42 >>> 43 >>> 44 >>> 45 >>> >>> Looks like all messages between checkpoints where replayed in output >>> topic. Also i expected to see results in output topic only after >>> checkpointing i.e. each 20 seconds, but messages appeared in output >>> immediately as they where send to input. >>> Is it supposed to be correct behaviour or i do something wrong? >>> >>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional >>> producer and read-committed console comsumer with custom code and it worked >>> perfectly well reading messages only after commitTransaction on producer. >>> >>> My Flink code: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> env.getConfig().setAutoWatermarkInterval(1000); >>> env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE); >>> env.setStateBackend(new >>> RocksDBStateBackend("hdfs:///checkpoints-data")); >>> >>> Properties producerProperty = new Properties(); >>> producerProperty.setProperty("bootstrap.servers", ...); >>> producerProperty.setProperty("zookeeper.connect", ...); >>> >>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000"); >>> >>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction"); >>> >>> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, >>> "true"); >>> >>> Properties consumerProperty = new Properties(); >>> consumerProperty.setProperty("bootstrap.servers", ...); >>> consumerProperty.setProperty("zookeeper.connect", ...); >>> consumerProperty.setProperty("group.id", "test2"); >>> >>> FlinkKafkaConsumer<String> consumer1 = new >>> FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), >>> consumerProperty); >>> consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner()); >>> >>> FlinkKafkaProducer<String> producer1 = new >>> FlinkKafkaProducer<String>("test", new KeyedSerializationSchemaWrapper(new >>> SimpleStringSchema()), producerProperty, >>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >>> producer1.ignoreFailuresAfterTransactionTimeout(); >>> DataStreamSource<String> s1 = env.addSource(consumer1); >>> s1.addSink(producer1); >>> env.execute("Test"); >>> >>> >>>