Which looking at your message again, would imply that the configuredKafkaCommit() method shouldn't inspect isolation.level
On Mon, Aug 8, 2022 at 3:27 PM John Casey <johnjca...@google.com> wrote: > .withReadCommitted() doesn't commit messages when read, it instead > specifies that the kafka consumer should only read messages that have > themselves been committed to kafka. > > Its use is for exactly once applications. > > > > On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <balazs.nem...@aliz.ai> > wrote: > >> I have been reading from Kafka and trying to figure out which offset >> management would be the best for my use-case. During that I noticed >> something odd. >> >> >> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362 >> >> private boolean configuredKafkaCommit() { >> return getConsumerConfig().get("isolation.level") == >> "read_committed" >> || >> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); >> } >> >> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298 >> >> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334 >> >> The name of the method, and how it's being used in the code certainly >> suggest that using read_committed isolation level handles and commits >> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. >> Well it does not. >> >> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it >> - using ONLY commitOffsetsInFinalize() does commit it >> >> - using ONLY withReadCommitted() does NOT commit it >> >> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read >> >> So is it a bug, or what am I missing here? >> >> If it is indeed a bug, then is it with the read_committed (so it should >> commit it although found no explicit documentation about that anywhere), or >> having that isolation level shouldn't prefer the commit in the finalize and >> that method is wrong? >> >> >> >>