Thanks for the quick turnaround on this

On Mon, Aug 8, 2022 at 9:34 PM Balázs Németh <balazs.nem...@aliz.ai> wrote:

> thanks, see https://github.com/apache/beam/issues/22631 +
> https://github.com/apache/beam/pull/22633
>
> John Casey via dev <dev@beam.apache.org> ezt írta (időpont: 2022. aug.
> 8., H, 21:30):
>
>> Which looking at your message again, would imply that the
>> configuredKafkaCommit() method shouldn't inspect isolation.level
>>
>> On Mon, Aug 8, 2022 at 3:27 PM John Casey <johnjca...@google.com> wrote:
>>
>>> .withReadCommitted() doesn't commit messages when read, it instead
>>> specifies that the kafka consumer should only read messages that have
>>> themselves been committed to kafka.
>>>
>>> Its use is for exactly once applications.
>>>
>>>
>>>
>>> On Mon, Aug 8, 2022 at 3:16 PM Balázs Németh <balazs.nem...@aliz.ai>
>>> wrote:
>>>
>>>> I have been reading from Kafka and trying to figure out which offset
>>>> management would be the best for my use-case. During that I noticed
>>>> something odd.
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2359-L2362
>>>>
>>>>     private boolean configuredKafkaCommit() {
>>>>       return getConsumerConfig().get("isolation.level") ==
>>>> "read_committed"
>>>>           ||
>>>> Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
>>>>     }
>>>>
>>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2292-L2298
>>>>
>>>> https://github.com/apache/beam/blob/c9c161d018d116504fd5f83b48853a7071ec9ce4/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2321-L2334
>>>>
>>>> The name of the method, and how it's being used in the code certainly
>>>> suggest that using read_committed isolation level handles and commits
>>>> kafka offsets.Seemed strange, but I'm not a Kafka pro, so let's test it.
>>>> Well it does not.
>>>>
>>>> - using ONLY ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG does commit it
>>>> - using ONLY commitOffsetsInFinalize() does commit it
>>>>
>>>> - using ONLY withReadCommitted() does NOT commit it
>>>>
>>>> Dataflow, 2.40.0 Java SDK, without explicitly enabling SDF-read
>>>>
>>>> So is it a bug, or what am I missing here?
>>>>
>>>> If it is indeed a bug, then is it with the read_committed (so it should
>>>> commit it although found no explicit documentation about that anywhere), or
>>>> having that isolation level shouldn't prefer the commit in the finalize and
>>>> that method is wrong?
>>>>
>>>>
>>>>
>>>>

Reply via email to