Thanks for the quick turnaround on this On Mon, Aug 8, 2022 at 9:34 PM Balázs Németh <balazs.nem...@aliz.ai> wrote:
> thanks, see https://github.com/apache/beam/issues/22631 + > https://github.com/apache/beam/pull/22633 > > John Casey via dev <dev@beam.apache.org> ezt írta (időpont: 2022. aug. > 8., H, 21:30): > >> Which looking at your message again, would imply that the >> configuredKafkaCommit() method shouldn't inspect isolation.level >> >> On Mon, Aug 8, 2022 at 3:27 PM John Casey <johnjca...@google.com> wrote: >> >>> .withReadCommitted() doesn't commit messages when read, it instead >>> specifies that the kafka consumer should only read messages that have >>> themselves been committed to kafka. >>> >>> Its use is for exactly once applications. >>> >>> >>> >>> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <balazs.nem...@aliz.ai> >>> wrote: >>> >>>> I have been reading from Kafka and trying to figure out which offset >>>> management would be the best for my use-case. During that I noticed >>>> something odd. >>>> >>>> >>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362 >>>> >>>> private boolean configuredKafkaCommit() { >>>> return getConsumerConfig().get("isolation.level") == >>>> "read_committed" >>>> || >>>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); >>>> } >>>> >>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298 >>>> >>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334 >>>> >>>> The name of the method, and how it's being used in the code certainly >>>> suggest that using read_committed isolation level handles and commits >>>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. >>>> Well it does not. >>>> >>>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it >>>> - using ONLY commitOffsetsInFinalize() does commit it >>>> >>>> - using ONLY withReadCommitted() does NOT commit it >>>> >>>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read >>>> >>>> So is it a bug, or what am I missing here? >>>> >>>> If it is indeed a bug, then is it with the read_committed (so it should >>>> commit it although found no explicit documentation about that anywhere), or >>>> having that isolation level shouldn't prefer the commit in the finalize and >>>> that method is wrong? >>>> >>>> >>>> >>>>