thanks, see https://github.com/apache/beam/issues/22631 + https://github.com/apache/beam/pull/22633
John Casey via dev <[email protected]> ezt írta (időpont: 2022. aug. 8., H, 21:30): > Which looking at your message again, would imply that the > configuredKafkaCommit() method shouldn't inspect isolation.level > > On Mon, Aug 8, 2022 at 3:27 PM John Casey <[email protected]> wrote: > >> .withReadCommitted() doesn't commit messages when read, it instead >> specifies that the kafka consumer should only read messages that have >> themselves been committed to kafka. >> >> Its use is for exactly once applications. >> >> >> >> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <[email protected]> >> wrote: >> >>> I have been reading from Kafka and trying to figure out which offset >>> management would be the best for my use-case. During that I noticed >>> something odd. >>> >>> >>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362 >>> >>> private boolean configuredKafkaCommit() { >>> return getConsumerConfig().get("isolation.level") == >>> "read_committed" >>> || >>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); >>> } >>> >>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298 >>> >>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334 >>> >>> The name of the method, and how it's being used in the code certainly >>> suggest that using read_committed isolation level handles and commits >>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. >>> Well it does not. >>> >>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it >>> - using ONLY commitOffsetsInFinalize() does commit it >>> >>> - using ONLY withReadCommitted() does NOT commit it >>> >>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read >>> >>> So is it a bug, or what am I missing here? >>> >>> If it is indeed a bug, then is it with the read_committed (so it should >>> commit it although found no explicit documentation about that anywhere), or >>> having that isolation level shouldn't prefer the commit in the finalize and >>> that method is wrong? >>> >>> >>> >>>
