[ 
https://issues.apache.org/jira/browse/BEAM-7047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-7047:
--------------------------------
    Labels: stale-P2  (was: )

> Direct Runner loses checkpoint marks when committing offsets to Apache Kafka 
> partitions using KafkaIO
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-7047
>                 URL: https://issues.apache.org/jira/browse/BEAM-7047
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-direct
>    Affects Versions: 2.9.0
>            Reporter: Mark Norkin
>            Priority: P2
>              Labels: stale-P2
>         Attachments: 
> beam-direct-runner-not-finalize-all-kafka-offsets-master.zip
>
>
> On our project we are using Apache Kafka as a source for most of our Apache 
> Beam pipelines.
> We would like to leverage manual offset commit functionality implemented by 
> KafkaIO, and enabled by _commitOffsetsInFinalize_ option.
> We also written several tests that sort of represent and document this 
> functionality, and should run during our CI process using Direct Runner.
> However we experienced issues during tests implementation, particularly we 
> see that on Direct Runner  not all checkpoint marks which in case of KafkaIO 
> represent partition offsets are committed.
> I've created sample project, attached as external link and as src attachment 
> to JIRA ticket, to showcase the issue. 
> The result of this test execution is *_not_* deterministic, when failing the 
> exemplary stacktrace is as follows:
> {code:java}
> org.awaitility.core.ConditionTimeoutException: Condition with alias 'sent raw 
> messages are read and offsets are committed' didn't complete within 3 minutes 
> because lambda expression in 
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest: 
> expected <{raw_topic-0=10, raw_topic-1=10, raw_topic-2=10}> but was 
> <{raw_topic-1=10, raw_topic-0=10}>.
>       at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:145)
>       at 
> org.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:89)
>       at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:902)
>       at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:645)
>       at 
> com.marknorkin.beam.directrunner.sample.ParserEndToEndFlowCommitOffsetsTest.shouldTestOffsetCommit(ParserEndToEndFlowCommitOffsetsTest.java:138)
> {code}
>  
> This issue is probably not specific to KafkaIO, as it Direct Runner when 
> finalizing Checkpoint Marks works within general CheckpointMark interface.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to