Hi Vasily,

as far as I know, by default console-consumer reads uncommited.
Try setting  isolation.level to read_committed in console-consumer
properties.

Hope this helps,
Maxim.

On Fri, Aug 2, 2019 at 12:42 PM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi, Eduardo.
> Maybe i should describe experiment design  precisely :
> 1) I run Flink on YARN (YARN Session method).
> 2) I do not stop/cancell application, i just kill TaskManager process
> 3) After that YARN creates another TaskManager Process and auto checkpoint
> restore from HDFS happens.
>
> That's why i expect to see correct restore.
>
> С уважением,
> Василий Мельник
>
> *Glow**Byte Consulting* <http://www.gbconsulting.ru/>
>
> ===================
>
> Моб. тел.: +7 (903) 101-43-71
> vasily.mel...@glowbyteconsulting.com
>
>
> On Fri, 2 Aug 2019 at 13:04, Eduardo Winpenny Tejedor <
> eduardo.winpe...@gmail.com> wrote:
>
>> Hi Vasily,
>>
>> You're probably executing this from your IDE or from a local Flink
>> cluster without starting your job from a checkpoint.
>>
>> When you start your Flink job for the second time you need to specify the
>> path to the latest checkpoint as an argument, otherwise Flink will start
>> from scratch.
>>
>> You're probably thinking that's not great, ideally Flink should be able
>> to automatically continue from the last produced checkpoint, and actually
>> that's what the docs say! Well, that's only when you're running in a proper
>> cluster environment. Flink is able to recover using checkpoints when only
>> part of the cluster fails, not when the whole job is stopped. For full
>> stops you need to specify the checkpoint manually.
>>
>> Hope that helps!
>>
>>
>> On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
>> vasily.mel...@glowbyteconsulting.com> wrote:
>>
>>> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
>>> and Sink:
>>>
>>>    1. Run flink app, simply transferring messages from one topic to
>>>    another with parallelism=1, checkpoint interval 20 seconds
>>>    2. Generate messages with incrementing integer numbers using Python
>>>    script each 2 seconds.
>>>    3. Read output topic with console consumer in read_committed
>>>    isolation level.
>>>    4. Manually kill TaskManager
>>>
>>> I expect to see monotonically increasing integers in output topic
>>> regardless TaskManager killing and recovery.
>>>
>>> But actually a see something unexpected in console-consumer output:
>>>
>>> 32
>>> 33
>>> 34
>>> 35
>>> 36
>>> 37
>>> 38
>>> 39
>>> 40
>>> -- TaskManager Killed
>>> 32
>>> 34
>>> 35
>>> 36
>>> 40
>>> 41
>>> 46
>>> 31
>>> 33
>>> 37
>>> 38
>>> 39
>>> 42
>>> 43
>>> 44
>>> 45
>>>
>>> Looks like all messages between checkpoints where replayed in output
>>> topic. Also i expected to see results in output topic only after
>>> checkpointing i.e. each 20 seconds, but messages appeared in output
>>> immediately as they where send to input.
>>> Is it supposed to be correct behaviour or i do something wrong?
>>>
>>> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
>>> producer and read-committed console comsumer with custom code and it worked
>>> perfectly well reading messages only after commitTransaction on producer.
>>>
>>> My Flink code:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>         env.getConfig().setAutoWatermarkInterval(1000);
>>>         env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
>>>         env.setStateBackend(new 
>>> RocksDBStateBackend("hdfs:///checkpoints-data"));
>>>
>>>         Properties producerProperty = new Properties();
>>>         producerProperty.setProperty("bootstrap.servers", ...);
>>>         producerProperty.setProperty("zookeeper.connect", ...);
>>>         
>>> producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
>>>         
>>> producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>>>         
>>> producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
>>> "true");
>>>
>>>         Properties consumerProperty = new Properties();
>>>         consumerProperty.setProperty("bootstrap.servers", ...);
>>>         consumerProperty.setProperty("zookeeper.connect", ...);
>>>         consumerProperty.setProperty("group.id", "test2");
>>>
>>>         FlinkKafkaConsumer<String> consumer1 = new 
>>> FlinkKafkaConsumer<String>("stringTopic1", new ComplexStringSchema(), 
>>> consumerProperty);
>>>         consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>>>
>>>         FlinkKafkaProducer<String> producer1 = new 
>>> FlinkKafkaProducer<String>("test",  new KeyedSerializationSchemaWrapper(new 
>>> SimpleStringSchema()), producerProperty, 
>>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>>         producer1.ignoreFailuresAfterTransactionTimeout();
>>>         DataStreamSource<String> s1 = env.addSource(consumer1);
>>>         s1.addSink(producer1);
>>>         env.execute("Test");
>>>
>>>
>>>

Reply via email to