I have been reading from Kafka and trying to figure out which offset
management would be the best for my use-case. During that I noticed
something odd.

https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362

    private boolean configuredKafkaCommit() {
      return getConsumerConfig().get("isolation.level") == "read_committed"
          ||
Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
    }
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334

The name of the method, and how it's being used in the code certainly
suggest that using read_committed isolation level handles and commits kafka
offsets.Seemed strange, but I'm not a Kafka pro, so let's test it. Well it
does not.

- using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
- using ONLY commitOffsetsInFinalize() does commit it

- using ONLY withReadCommitted() does NOT commit it

Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read

So is it a bug, or what am I missing here?

If it is indeed a bug, then is it with the read_committed (so it should
commit it although found no explicit documentation about that anywhere), or
having that isolation level shouldn't prefer the commit in the finalize and
that method is wrong?

Reply via email to