On Thu, Jan 10, 2019 at 7:57 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Don’t you think that we could have some race condition there since,
> according to initial issue description, sometimes offset was committed and
> sometimes not?
>

Yeah, there is a timing issue. 'finalizeCheckpoint()' does not wait until
checkpoint is committed by the IO thread. See comment at
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L613

This is best suited for unbounded case, since we don't want to block on
each call to finalize checkpoint. There could be lots of these calls per
second in a streaming pipeline and we only need to commit the latest
checkpoint. But that does not work well when this is used in bounded reader
context.

Fix: KafkaIO could store a flag that it is being read by a bounded wrapper
(see expand() where the bounded wrapper is added). When this flag is set it
could wake up the IO thread and wait for offsets to be committed.





>
>
> On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com> wrote:
>
> Oh, the generic bounded source wrapper over an unbounded source does not
> seem to call finalize when it is done with a split. I think it should.
>
> Could you file a bug for the wrapper?
> Mean while, this check could be added sanity checks in
> KafkaIO.Read.expand().
>
>
>
> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <
> andre.missag...@arquivei.com.br> wrote:
>
>> Hi Juan,
>>
>> After researching a bit, I found this issue, which is open since 2017:
>> https://issues.apache.org/jira/browse/BEAM-2185
>>
>> I guess KafkaIO isn't intended to provide a bounded source. Maybe I
>> should write my own code that fetches messages from kafka, even if it means
>> giving up on some processing guarantees from beam...
>>
>>
>> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <
>> jcgarc...@gmail.com> escreveu:
>>
>>> Just for you to have a look where this happen:
>>>
>>>
>>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>>
>>> Cheers
>>>
>>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com>
>>> wrote:
>>>
>>>> I also experience the same, as per the documentation **withMaxReadTime**
>>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i
>>>> guess is beyond the scope of the current KafkaIO to behave as Bounded with
>>>> offset management or just something is missing in the current
>>>> implementation (Watermarking).
>>>>
>>>>
>>>>
>>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>>>> andre.missag...@arquivei.com.br> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I need to do some batch processing that uses messages in a Kafka
>>>>> topic. So I tried the "withMaxReadTime" KafkaIO setting:
>>>>>
>>>>> ---
>>>>> val properties = new Properties()
>>>>> properties.setProperty("bootstrap.servers", "...")
>>>>> properties.setProperty("group.id", "mygroup")
>>>>> properties.setProperty("sasl.jaas.config", "...")
>>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>>>> properties.setProperty("enable.auto.commit", "false")
>>>>>
>>>>> sc.customInput("Read From Kafka",
>>>>>   KafkaIO
>>>>>     .read[String, String]()
>>>>>     .withTopic("mytopic")
>>>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>>>     .updateConsumerProperties(properties)
>>>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>>>     .withMaxNumRecords(1000000)
>>>>>     .commitOffsetsInFinalize()
>>>>>     .withoutMetadata()
>>>>> )
>>>>> .count.debug() // prints something between 10000 and 20000
>>>>> ---
>>>>> I can see that it was able to read the messages and process them. But
>>>>> in the end, no offset was commited:
>>>>>
>>>>> TOPIC                                        PARTITION
>>>>> CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>>>> HOST            CLIENT-ID
>>>>> mytopic                                         0
>>>>> 0               3094751         3094751         -
>>>>> -               -
>>>>>
>>>>> But it is a strange behavior: sometimes it commits the offset,
>>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong 
>>>>> configs.
>>>>>
>>>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> --
>>>>> *André Badawi Missaglia*
>>>>> Data Engineer
>>>>> (16) 3509-5515 *|* www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> JC
>>>>
>>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>
>
>

Reply via email to