On Thu, Jan 10, 2019 at 7:57 AM Alexey Romanenko <aromanenko....@gmail.com> wrote:
> Don’t you think that we could have some race condition there since, > according to initial issue description, sometimes offset was committed and > sometimes not? > Yeah, there is a timing issue. 'finalizeCheckpoint()' does not wait until checkpoint is committed by the IO thread. See comment at https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L613 This is best suited for unbounded case, since we don't want to block on each call to finalize checkpoint. There could be lots of these calls per second in a streaming pipeline and we only need to commit the latest checkpoint. But that does not work well when this is used in bounded reader context. Fix: KafkaIO could store a flag that it is being read by a bounded wrapper (see expand() where the bounded wrapper is added). When this flag is set it could wake up the IO thread and wait for offsets to be committed. > > > On 9 Jan 2019, at 19:48, Raghu Angadi <ang...@gmail.com> wrote: > > Oh, the generic bounded source wrapper over an unbounded source does not > seem to call finalize when it is done with a split. I think it should. > > Could you file a bug for the wrapper? > Mean while, this check could be added sanity checks in > KafkaIO.Read.expand(). > > > > On Wed, Jan 9, 2019 at 10:37 AM André Missaglia < > andre.missag...@arquivei.com.br> wrote: > >> Hi Juan, >> >> After researching a bit, I found this issue, which is open since 2017: >> https://issues.apache.org/jira/browse/BEAM-2185 >> >> I guess KafkaIO isn't intended to provide a bounded source. Maybe I >> should write my own code that fetches messages from kafka, even if it means >> giving up on some processing guarantees from beam... >> >> >> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia < >> jcgarc...@gmail.com> escreveu: >> >>> Just for you to have a look where this happen: >>> >>> >>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 >>> >>> Cheers >>> >>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com> >>> wrote: >>> >>>> I also experience the same, as per the documentation **withMaxReadTime** >>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i >>>> guess is beyond the scope of the current KafkaIO to behave as Bounded with >>>> offset management or just something is missing in the current >>>> implementation (Watermarking). >>>> >>>> >>>> >>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia < >>>> andre.missag...@arquivei.com.br> wrote: >>>> >>>>> Hello everyone, >>>>> >>>>> I need to do some batch processing that uses messages in a Kafka >>>>> topic. So I tried the "withMaxReadTime" KafkaIO setting: >>>>> >>>>> --- >>>>> val properties = new Properties() >>>>> properties.setProperty("bootstrap.servers", "...") >>>>> properties.setProperty("group.id", "mygroup") >>>>> properties.setProperty("sasl.jaas.config", "...") >>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT") >>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") >>>>> properties.setProperty("enable.auto.commit", "false") >>>>> >>>>> sc.customInput("Read From Kafka", >>>>> KafkaIO >>>>> .read[String, String]() >>>>> .withTopic("mytopic") >>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>> .updateConsumerProperties(properties) >>>>> .withMaxReadTime(Duration.standardSeconds(20)) >>>>> .withMaxNumRecords(1000000) >>>>> .commitOffsetsInFinalize() >>>>> .withoutMetadata() >>>>> ) >>>>> .count.debug() // prints something between 10000 and 20000 >>>>> --- >>>>> I can see that it was able to read the messages and process them. But >>>>> in the end, no offset was commited: >>>>> >>>>> TOPIC PARTITION >>>>> CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID >>>>> HOST CLIENT-ID >>>>> mytopic 0 >>>>> 0 3094751 3094751 - >>>>> - - >>>>> >>>>> But it is a strange behavior: sometimes it commits the offset, >>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong >>>>> configs. >>>>> >>>>> Has anyone used Bounded KafkaIO before? is there anything I can do? >>>>> >>>>> Best Regards, >>>>> >>>>> -- >>>>> *André Badawi Missaglia* >>>>> Data Engineer >>>>> (16) 3509-5515 *|* www.arquivei.com.br >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>> Silício] >>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>> <https://www.facebook.com/arquivei> >>>>> <https://www.linkedin.com/company/arquivei> >>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >>>>> >>>> >>>> >>>> -- >>>> >>>> JC >>>> >>>> >>> >>> -- >>> >>> JC >>> >>> >> >> -- >> *André Badawi Missaglia* >> Data Engineer >> (16) 3509-5515 *|* www.arquivei.com.br >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >> Silício] >> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >> <https://www.facebook.com/arquivei> >> <https://www.linkedin.com/company/arquivei> >> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >> > >