Hi Juan, After researching a bit, I found this issue, which is open since 2017: https://issues.apache.org/jira/browse/BEAM-2185
I guess KafkaIO isn't intended to provide a bounded source. Maybe I should write my own code that fetches messages from kafka, even if it means giving up on some processing guarantees from beam... Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarc...@gmail.com> escreveu: > Just for you to have a look where this happen: > > > https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 > > Cheers > > On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarc...@gmail.com> > wrote: > >> I also experience the same, as per the documentation **withMaxReadTime** >> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess >> is beyond the scope of the current KafkaIO to behave as Bounded with offset >> management or just something is missing in the current implementation >> (Watermarking). >> >> >> >> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia < >> andre.missag...@arquivei.com.br> wrote: >> >>> Hello everyone, >>> >>> I need to do some batch processing that uses messages in a Kafka topic. >>> So I tried the "withMaxReadTime" KafkaIO setting: >>> >>> --- >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", "...") >>> properties.setProperty("group.id", "mygroup") >>> properties.setProperty("sasl.jaas.config", "...") >>> properties.setProperty("security.protocol", "SASL_PLAINTEXT") >>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256") >>> properties.setProperty("enable.auto.commit", "false") >>> >>> sc.customInput("Read From Kafka", >>> KafkaIO >>> .read[String, String]() >>> .withTopic("mytopic") >>> .withKeyDeserializer(classOf[StringDeserializer]) >>> .withValueDeserializer(classOf[StringDeserializer]) >>> .updateConsumerProperties(properties) >>> .withMaxReadTime(Duration.standardSeconds(20)) >>> .withMaxNumRecords(1000000) >>> .commitOffsetsInFinalize() >>> .withoutMetadata() >>> ) >>> .count.debug() // prints something between 10000 and 20000 >>> --- >>> I can see that it was able to read the messages and process them. But in >>> the end, no offset was commited: >>> >>> TOPIC PARTITION CURRENT-OFFSET >>> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID >>> mytopic 0 >>> 0 3094751 3094751 - >>> - - >>> >>> But it is a strange behavior: sometimes it commits the offset, sometimes >>> not. I'm not sure if it is a bug, or I'm using the wrong configs. >>> >>> Has anyone used Bounded KafkaIO before? is there anything I can do? >>> >>> Best Regards, >>> >>> -- >>> *André Badawi Missaglia* >>> Data Engineer >>> (16) 3509-5515 *|* www.arquivei.com.br >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>> Silício] >>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>> <https://www.facebook.com/arquivei> >>> <https://www.linkedin.com/company/arquivei> >>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on> >>> >> >> >> -- >> >> JC >> >> > > -- > > JC > > -- *André Badawi Missaglia* Data Engineer (16) 3509-5515 *|* www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Arquivei.com.br – Inteligência em Notas Fiscais] <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Google seleciona Arquivei para imersão e mentoria no Vale do Silício] <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> <https://www.facebook.com/arquivei> <https://www.linkedin.com/company/arquivei> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>