Which looking at your message again, would imply that the
configuredKafkaCommit() method shouldn't inspect isolation.level

On Mon, Aug 8, 2022 at 3:27 PM John Casey <johnjca...@google.com> wrote:

> .withReadCommitted() doesn't commit messages when read, it instead
> specifies that the kafka consumer should only read messages that have
> themselves been committed to kafka.
>
> Its use is for exactly once applications.
>
>
>
> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <balazs.nem...@aliz.ai>
> wrote:
>
>> I have been reading from Kafka and trying to figure out which offset
>> management would be the best for my use-case. During that I noticed
>> something odd.
>>
>>
>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
>>
>>     private boolean configuredKafkaCommit() {
>>       return getConsumerConfig().get("isolation.level") ==
>> "read_committed"
>>           ||
>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
>>     }
>>
>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
>>
>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
>>
>> The name of the method, and how it's being used in the code certainly
>> suggest that using read_committed isolation level handles and commits
>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it.
>> Well it does not.
>>
>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
>> - using ONLY commitOffsetsInFinalize() does commit it
>>
>> - using ONLY withReadCommitted() does NOT commit it
>>
>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
>>
>> So is it a bug, or what am I missing here?
>>
>> If it is indeed a bug, then is it with the read_committed (so it should
>> commit it although found no explicit documentation about that anywhere), or
>> having that isolation level shouldn't prefer the commit in the finalize and
>> that method is wrong?
>>
>>
>>
>>

Reply via email to