[ https://issues.apache.org/jira/browse/BEAM-6991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raghu Angadi resolved BEAM-6991. -------------------------------- Resolution: Not A Problem Fix Version/s: Not applicable I am closing this as 'Working as Intended'. Please open if you need more information. > EOS: Streaming job fails on job restart with withEOS specified > -------------------------------------------------------------- > > Key: BEAM-6991 > URL: https://issues.apache.org/jira/browse/BEAM-6991 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Affects Versions: 2.9.0, 2.11.0 > Reporter: Anton Lytvynenko > Assignee: Alexey Romanenko > Priority: Critical > Fix For: Not applicable > > > According to the > [documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?], > the *'transactional.id'* should be the same on producer restart. > In BEAM, the *'transactional.id'* is defined under the hood as follows in > *org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*: > > {code:java} > String producerName = String.format("producer_%d_for_%s", shard, > spec.getSinkGroupId()); > ... > Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig()); > producerConfig.putAll( > ImmutableMap.of( > ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true, > ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName)); > ...{code} > > So to make it consistent (with same value that was used by this writer on > previous job run) after job restart, I need to configure KafkaIO writer with > the constant *'sinkGroupId':* > > {code:java} > .withEOS(numShards, "myWriterSinkGroupId");{code} > > and restart the job after it was canceled I get the following exception: > {code:java} > java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: > java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there > is no stored state for it. This mostly indicates groupId > 'myWriterSinkGroupId' is used else where or in earlier runs. Try another > group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - > 2019-04-02 16:05:26"}' > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102) > > org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057) > > org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438) > > org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125) > > org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060) > org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930) > > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) > > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.beam.sdk.util.UserCodeException: > java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there > is no stored state for it. This mostly indicates groupId > 'myWriterSinkGroupId' is used else where or in earlier runs. Try another > group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - > 2019-04-02 16:05:26"}' > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) > > org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown > Source) > > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) > > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102) > > org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057) > > org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438) > > org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125) > > org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060) > org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930) > > org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94) > > org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73) > > org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) > > org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard > 0, but there is no stored state for it. This mostly indicates groupId > 'myWriterSinkGroupId' is used else where or in earlier runs. Try another > group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - > 2019-04-02 16:05:26"}' > > org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574) > > org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code} > That basically says that I need to change '*sinkGroupId'* to something > different, but if I change it and rerun a job, then I have duplicated > messages in the destination topic. > In other words, it breaks the exactly-once semantics messages delivery > guarantees. > My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the > behavior is the same. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)