Yes, we are resuming from samza’s last commit. But the problem is that the last commit was done for data in the window that is not completely processed.
From: Lukasz Cwik <lc...@google.com> Date: Wednesday, July 10, 2019 at 11:07 AM To: dev <d...@beam.apache.org> Cc: "LeVeck, Matt" <matt_lev...@intuit.com>, "Deshpande, Omkar" <omkar_deshpa...@intuit.com>, Xinyu Liu <xinyuliu...@gmail.com>, Xinyu Liu <xi...@linkedin.com>, Samarth Shetty <sshe...@linkedin.com>, "Audo, Nicholas" <nicholas_a...@intuit.com>, "Cesar, Scott" <scott_ce...@intuit.com>, "Ho, Tom" <tom...@intuit.com>, "dev@samza.apache.org" <dev@samza.apache.org> Subject: Re: Beam/Samza Ensuring At Least Once semantics This email is from an external sender. When you restart the application, are you resuming it from Samza's last commit? Since the exception is thrown after the GBK, all the data could be read from Kafka and forwarded to the GBK operator inside of Samza and checkpointed in Kafka before the exception is ever thrown. On Tue, Jul 9, 2019 at 8:34 PM Benenson, Mikhail <mikhail_benen...@intuit.com<mailto:mikhail_benen...@intuit.com>> wrote: Hi I have run a few experiments to verify if 'at least once' processing is guarantee on Beam 2.13.0 with Samza Runner 1.1.0 Beam application is a slightly modified Stream Word Count from Beam examples: * read strings from input Kafka topic, print (topic, partition, offset, value) * convert values to pairs (value, 1) * grouping in Fixed Windows with duration 30 sec * sum per key * throw exception, if key starts with 'm' * write (key, sum) to output Kafka topic Tried KafkaIO.read() with and without commitOffsetsInFinalize() there is no difference in results. Please, see src code attached. Environment: * Run with local zk & kafka, pre-create input & output topics with 1 partition. * samza.properties contains "task.commit.ms<http://task.commit.ms>=2000". According to samza doc "this property determines how often a checkpoint is written. The value is the time between checkpoints, in milliseconds". See http://samza.apache.org/learn/documentation/latest/jobs/samza-configurations.html#checkpointing. Please, see samza config file and run script attached. Scenario 1: Exception in transformation Run * Write 'a', 'b', 'c', 'm', 'd', 'e' into input topic * start Beam app * verify, that app log contains "read from topic=XXX, part=0, offset=100, val: e". Because input topic has only one partition, this means all data have been read from Kafka. * wait, until app terminates, because of the exception, while processing 'm' Expectation The order of processing after grouping is not specified, so some data could be written to output topic before application terminates, but I expect that value=m with offset 98 and all later records must NOT be marked as processed, so if I restart Beam app, I expect it again throws the exception when processing value=m. Comment: throwing exception in transformation is not a good idea, but such exception could be the result of application error. So, expectation is that after fixing the error, and restarting Beam app, it should process the record that cause an error. Results After I restarted app, it does NOT re-processing value m and does not throws an exception. If I add new value 'f' into input topic, I see "read from topic=XXX, part=0, offset=101, val: f", and after some time I see 'm' in the output topic. So, the record with value 'm' is NOT processed. Scenario 2: App termination Run * Write 'g', 'h', 'i', 'j' into input topic * start Beam app * verify, that app log contains "read from topic=XXX, part=0, offset=105, val: j". Because input topic has only one partition, this means that all data has been read from Kafka. * wait about 10 sec, then terminate Beam app. The idea is to terminate app, when, ''g', 'h', 'i', 'j' are waiting in the 30 sec Fixed Windows, but after task.commit.ms<http://task.commit.ms>=2000 pass, so offsets are committed. Expectation As records 'g', 'h', 'i', 'j' are NOT processed, I expect that after app restarted, it again reads ‘g’, ‘h’, ‘I’, ‘j’ from input topic and process these records. Results After I restarted app, it does NOT re-process ‘g’, ‘h’, ‘I’, ‘j’ values. If I add new value ‘k’ into input topic, I see “read from topic=XXX, part=0, offset=106, val: k”, and after some time I see ‘k’ in the output topic. So, the records with values ‘g’, ‘h’, ‘I’, ‘j’ are NOT processed. Based on these results I’m incline to conclude that Beam with Samza runner does NOT provides 'at least once' guarantee for processing. If I missed something? ------------------ Michael Benenson From: "LeVeck, Matt" <matt_lev...@intuit.com<mailto:matt_lev...@intuit.com>> Date: Monday, July 1, 2019 at 5:28 PM To: "Deshpande, Omkar" <omkar_deshpa...@intuit.com<mailto:omkar_deshpa...@intuit.com>>, "Benenson, Mikhail" <mikhail_benen...@intuit.com<mailto:mikhail_benen...@intuit.com>>, Xinyu Liu <xinyuliu...@gmail.com<mailto:xinyuliu...@gmail.com>>, Xinyu Liu <xi...@linkedin.com<mailto:xi...@linkedin.com>>, Samarth Shetty <sshe...@linkedin.com<mailto:sshe...@linkedin.com>>, "Audo, Nicholas" <nicholas_a...@intuit.com<mailto:nicholas_a...@intuit.com>> Subject: Beam/Samza Ensuring At Least Once semantics We’re seeing some behavior when using Beam’s KafkaIO and Samza as the runner that suggests checkpoints are getting committed even when an error gets throwing in the Beam Pipline while processing a batch. Do you all have a recommended set of settings/patterns for using Beam with Samza to ensure that checkpoints are only updated after successful processing (i.e. the transforms succeed and the message is sent to the Beam pipeline’s final output sink)? Our current settings for Samza are: task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka task.shutdown.ms<http://task.shutdown.ms>=10000 task.commit.ms<http://task.commit.ms>=2000 Nothing is specified with regards to checkpointing at the Beam level. Thanks, Matt