[ 
https://issues.apache.org/jira/browse/BEAM-6991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raghu Angadi resolved BEAM-6991.
--------------------------------
       Resolution: Not A Problem
    Fix Version/s: Not applicable

I am closing this as 'Working as Intended'. Please open if you need more 
information.

> EOS: Streaming job fails on job restart with withEOS specified
> --------------------------------------------------------------
>
>                 Key: BEAM-6991
>                 URL: https://issues.apache.org/jira/browse/BEAM-6991
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.9.0, 2.11.0
>            Reporter: Anton Lytvynenko
>            Assignee: Alexey Romanenko
>            Priority: Critical
>             Fix For: Not applicable
>
>
> According to the 
> [documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?],
>  the *'transactional.id'* should be the same on producer restart.
> In BEAM, the *'transactional.id'* is defined under the hood as follows in 
> *org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*:
>  
> {code:java}
> String producerName = String.format("producer_%d_for_%s", shard, 
> spec.getSinkGroupId());
> ...
> Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
>  producerConfig.putAll(
>  ImmutableMap.of(
>  ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true,
>  ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName));
> ...{code}
>  
> So to make it consistent (with same value that was used by this writer on 
> previous job run) after job restart, I need to configure KafkaIO writer with 
> the constant *'sinkGroupId':*
>  
> {code:java}
> .withEOS(numShards, "myWriterSinkGroupId");{code}
>  
> and restart the job after it was canceled I get the following exception:
> {code:java}
> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there 
> is no stored state for it. This mostly indicates groupId 
> 'myWriterSinkGroupId' is used else where or in earlier runs. Try another 
> group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 
> 2019-04-02 16:05:26"}'
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there 
> is no stored state for it. This mostly indicates groupId 
> 'myWriterSinkGroupId' is used else where or in earlier runs. Try another 
> group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 
> 2019-04-02 16:05:26"}'
>  org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>  
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>  
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>  
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
>  
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  
> org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  
> org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  
> org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard 
> 0, but there is no stored state for it. This mostly indicates groupId 
> 'myWriterSinkGroupId' is used else where or in earlier runs. Try another 
> group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 
> 2019-04-02 16:05:26"}'
>  
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574)
>  
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code}
> That basically says that I need to change '*sinkGroupId'* to something 
> different, but if I change it and rerun a job, then I have duplicated 
> messages in the destination topic. 
> In other words, it breaks the exactly-once semantics messages delivery 
> guarantees.
> My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the 
> behavior is the same.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to