Hi Eduardo, There another work around you can try without having to wait for 2.7.0 release: Use a wrapper to catch exception from KafkaConsumer#poll() and pass the wrapper to withConsumerFactoryFn() for KafkIO reader [1].
Using something like (such a wrapper is used in KafkasIO tests [2]): private static class ConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> { @Override public Consumer<byte[], byte[]> apply(Map<String, Object> config) { return new KafkaConsumer(config) { @Override public ConsumerRecords<K, V> poll(long timeout) { // work around for BEAM-5375 while (true) { try { return super.poll(timeout); } catch (Exception e) { // LOG & sleep for sec } } } } } [1]: https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L417 [2]: https://github.com/apache/beam/blob/release-2.4.0/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java#L261 On Tue, Sep 18, 2018 at 5:49 AM Eduardo Soldera < eduardo.sold...@arquivei.com.br> wrote: > Hi Raghu, we're not sure how long the network was down. According to the > logs no longer than one minute. A 30 second shutdown would work for the > tests. > > Regards > > Em sex, 14 de set de 2018 às 21:41, Raghu Angadi <rang...@google.com> > escreveu: > >> Thanks. I could repro myself as well. How long was the network down? >> >> Trying to get the fix into 2.7 RC2. >> >> On Fri, Sep 14, 2018 at 12:25 PM Eduardo Soldera < >> eduardo.sold...@arquivei.com.br> wrote: >> >>> Just to make myself clear, I'm not sure how to use the patch but if you >>> could send us some guidance would be great. >>> >>> Em sex, 14 de set de 2018 às 16:24, Eduardo Soldera < >>> eduardo.sold...@arquivei.com.br> escreveu: >>> >>>> Hi Raghu, yes, it is feasible, would you do that for us? I'm not sure >>>> how we'd use the patch. We're using SBT and Spotify's Scio with Scala. >>>> >>>> Thanks >>>> >>>> Em sex, 14 de set de 2018 às 16:07, Raghu Angadi <rang...@google.com> >>>> escreveu: >>>> >>>>> Is is feasible for you to verify the fix in your dev job? I can make a >>>>> patch against Beam 2.4 branch if you like. >>>>> >>>>> Raghu. >>>>> >>>>> On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera < >>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>> >>>>>> Hi Raghu, thank you very much for the pull request. >>>>>> We'll wait for the 2.7 Beam release. >>>>>> >>>>>> Regards! >>>>>> >>>>>> Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <rang...@google.com> >>>>>> escreveu: >>>>>> >>>>>>> Fix: https://github.com/apache/beam/pull/6391 >>>>>>> >>>>>>> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <rang...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. >>>>>>>> I will fix it later this week. >>>>>>>> >>>>>>>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for the job id, I looked at the worker logs (following >>>>>>>>>> usual support oncall access protocol that provides temporary access >>>>>>>>>> to >>>>>>>>>> things like logs in GCP): >>>>>>>>>> >>>>>>>>>> Root issue looks like consumerPollLoop() mentioned earlier needs >>>>>>>>>> to handle unchecked exception. In your case it is clear that poll >>>>>>>>>> thread >>>>>>>>>> exited with a runtime exception. The reader does not check for it and >>>>>>>>>> continues to wait for poll thread to enqueue messages. A fix should >>>>>>>>>> result >>>>>>>>>> in an IOException for read from the source. The runners will handle >>>>>>>>>> that >>>>>>>>>> appropriately after that. I will file a jira. >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>>>>>>>> >>>>>>>>> >>>>>>>>> Ignore the link.. was pasted here by mistake. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> From the logs (with a comment below each one): >>>>>>>>>> >>>>>>>>>> - 2018-09-12 06:13:07.345 PDT Reader-0: reading from >>>>>>>>>> kafka_topic-0 starting at offset 2 >>>>>>>>>> - Implies the reader is initialized and poll thread is >>>>>>>>>> started. >>>>>>>>>> - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2 >>>>>>>>>> - The reader actually got a message received by the poll >>>>>>>>>> thread from Kafka. >>>>>>>>>> - 2018-09-12 06:16:48.771 PDT Reader-0: exception while >>>>>>>>>> fetching latest offset for partition kafka_topic-0. will be >>>>>>>>>> retried. >>>>>>>>>> - This must have happened around the time when network was >>>>>>>>>> disrupted. This is from. Actual log is from another periodic >>>>>>>>>> task that >>>>>>>>>> fetches latest offsets for partitions. >>>>>>>>>> >>>>>>>>>> The poll thread must have died around the time network was >>>>>>>>>> disrupted. >>>>>>>>>> >>>>>>>>>> The following log comes from kafka client itself and is printed >>>>>>>>>> every second when KafkaIO fetches latest offset. This log seems to >>>>>>>>>> be added >>>>>>>>>> in recent versions. It is probably an unintentional log. I don't >>>>>>>>>> think >>>>>>>>>> there is any better to fetch latest offsets than how KafkaIO does >>>>>>>>>> now. This >>>>>>>>>> is logged inside consumer.position() called at [1]. >>>>>>>>>> >>>>>>>>>> - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2, >>>>>>>>>> groupId=Reader-0_offset_consumer_1735388161_genericPipe] >>>>>>>>>> Resetting offset >>>>>>>>>> for partition com.arquivei.dataeng.andre-0 to offset 3. >>>>>>>>>> >>>>>>>>>> [1]: >>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>>>>>>>> >>>>>>>>> >>>>>>>>> This 'Resetting offset' is harmless, but is quite annoying to see >>>>>>>>> in the worker logs. One way to avoid is to set kafka consumer's log >>>>>>>>> level >>>>>>>>> to WARNING. Ideally KafkaIO itself should do something to avoid it >>>>>>>>> without >>>>>>>>> user option. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera < >>>>>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Raghu! The job_id of our dev job is >>>>>>>>>>> 2018-09-12_06_11_48-5600553605191377866. >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi < >>>>>>>>>>> rang...@google.com> escreveu: >>>>>>>>>>> >>>>>>>>>>>> Thanks for debugging. >>>>>>>>>>>> Can you provide the job_id of your dev job? The stacktrace >>>>>>>>>>>> shows that there is no thread running 'consumerPollLoop()' which >>>>>>>>>>>> can >>>>>>>>>>>> explain stuck reader. You will likely find a logs at line 594 & >>>>>>>>>>>> 587 [1]. >>>>>>>>>>>> Dataflow caches its readers and DirectRunner may not. That can >>>>>>>>>>>> explain >>>>>>>>>>>> DirectRunner resume reads. The expectation in KafkaIO is that >>>>>>>>>>>> Kafka client >>>>>>>>>>>> library takes care of retrying in case of connection problems (as >>>>>>>>>>>> documented). It is possible that in some cases poll() throws and >>>>>>>>>>>> we need to >>>>>>>>>>>> restart the client in KafkaIO. >>>>>>>>>>>> >>>>>>>>>>>> [1]: >>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594 >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera < >>>>>>>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Raghu, thanks for your help. >>>>>>>>>>>>> Just answering your previous question, the following logs were >>>>>>>>>>>>> the same as before the error, as if the pipeline were still >>>>>>>>>>>>> getting the >>>>>>>>>>>>> messages, for example: >>>>>>>>>>>>> >>>>>>>>>>>>> (...) >>>>>>>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>>>>>>> offset 10. >>>>>>>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>>>>>>> offset 15. >>>>>>>>>>>>> ERROR >>>>>>>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>>>>>>> offset 22. >>>>>>>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>>>>>>> offset 30. >>>>>>>>>>>>> (...) >>>>>>>>>>>>> >>>>>>>>>>>>> But when checking the Kafka Consumer Group, the current offset >>>>>>>>>>>>> stays at 15, the commited offset from the last processed message, >>>>>>>>>>>>> before >>>>>>>>>>>>> the error. >>>>>>>>>>>>> >>>>>>>>>>>>> We'll file a bug, but we could now reproduce the issue in a >>>>>>>>>>>>> Dev scenario. >>>>>>>>>>>>> We started the same pipeline using the direct runner, without >>>>>>>>>>>>> Google Dataflow. We blocked the Kafka Broker network and the same >>>>>>>>>>>>> error was >>>>>>>>>>>>> thrown. Then we unblocked the network and the pipeline was able to >>>>>>>>>>>>> successfully process the subsequent messages. >>>>>>>>>>>>> When we started the same pipeline in the Dataflow runner and >>>>>>>>>>>>> did the same test, the same problem from our production scenario >>>>>>>>>>>>> happened, >>>>>>>>>>>>> Dataflow couldn't process the new messages. Unfortunately, we've >>>>>>>>>>>>> stopped >>>>>>>>>>>>> the dataflow job in production, but the problematic dev job is >>>>>>>>>>>>> still >>>>>>>>>>>>> running and the log file of the VM is attached. Thank you very >>>>>>>>>>>>> much. >>>>>>>>>>>>> Best regards >>>>>>>>>>>>> >>>>>>>>>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi < >>>>>>>>>>>>> rang...@google.com> escreveu: >>>>>>>>>>>>> >>>>>>>>>>>>>> Specifically, I am interested if you have any thread running >>>>>>>>>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a >>>>>>>>>>>>>> worker is >>>>>>>>>>>>>> assigned one of the partitions). It is possible that KafkaClient >>>>>>>>>>>>>> itself is >>>>>>>>>>>>>> hasn't recovered from the group coordinator error (though >>>>>>>>>>>>>> unlikely). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi < >>>>>>>>>>>>>> rang...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Eduardo, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In case of any error, the pipeline should keep on trying to >>>>>>>>>>>>>>> fetch. I don't know about this particular error. Do you see any >>>>>>>>>>>>>>> others >>>>>>>>>>>>>>> afterwards in the log? >>>>>>>>>>>>>>> Couple of things you could try if the logs are not useful : >>>>>>>>>>>>>>> - login to one of the VMs and get stacktrace of java worker >>>>>>>>>>>>>>> (look for a container called java-streaming) >>>>>>>>>>>>>>> - file a support bug or stackoverflow question with jobid >>>>>>>>>>>>>>> so that Dataflow oncall can take a look. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Raghu. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera < >>>>>>>>>>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow >>>>>>>>>>>>>>>> using KafkaIO. Suddenly the pipeline stop fetching Kafka >>>>>>>>>>>>>>>> messages at all, >>>>>>>>>>>>>>>> as our other workers from other pipelines continued to get >>>>>>>>>>>>>>>> Kafka messages. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> At the moment it stopped we got these messages: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Error >>>>>>>>>>>>>>>> sending fetch request (sessionId=1396189203, epoch=2431598) to >>>>>>>>>>>>>>>> node 3: org.apache.kafka.common.errors.DisconnectException. >>>>>>>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Group >>>>>>>>>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is >>>>>>>>>>>>>>>> unavailable or invalid, will attempt rediscovery >>>>>>>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] >>>>>>>>>>>>>>>> Discovered group coordinator 10.0.52.70:9093 (id: 2147483646 >>>>>>>>>>>>>>>> rack: null) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> And then the pipeline stopped reading the messages. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is the KafkaIO setup we have: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KafkaIO.read[String,String]() >>>>>>>>>>>>>>>> .withBootstrapServers(server) >>>>>>>>>>>>>>>> .withTopic(topic) >>>>>>>>>>>>>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>>>>>>>>>>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>>>>>>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>>>>>>>>> .commitOffsetsInFinalize() >>>>>>>>>>>>>>>> .withoutMetadata() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any help will be much appreciated. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Eduardo Soldera Garcia >>>>>>>>>>>>>>>> Data Engineer >>>>>>>>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria >>>>>>>>>>>>>>>> no Vale do Silício] >>>>>>>>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Eduardo Soldera Garcia >>>>>>>>>>>>> Data Engineer >>>>>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no >>>>>>>>>>>>> Vale do Silício] >>>>>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Eduardo Soldera Garcia >>>>>>>>>>> Data Engineer >>>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no >>>>>>>>>>> Vale do Silício] >>>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>>> -- >>>>>> Eduardo Soldera Garcia >>>>>> Data Engineer >>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>> Silício] >>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>> <https://www.facebook.com/arquivei> >>>>>> <https://www.linkedin.com/company/arquivei> >>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>> >>>>> >>>> >>>> -- >>>> Eduardo Soldera Garcia >>>> Data Engineer >>>> (16) 3509-5555 | www.arquivei.com.br >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>> Silício] >>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>> <https://www.facebook.com/arquivei> >>>> <https://www.linkedin.com/company/arquivei> >>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>> >>> >>> >>> -- >>> Eduardo Soldera Garcia >>> Data Engineer >>> (16) 3509-5555 | www.arquivei.com.br >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>> Silício] >>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>> <https://www.facebook.com/arquivei> >>> <https://www.linkedin.com/company/arquivei> >>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>> >> > > -- > Eduardo Soldera Garcia > Data Engineer > (16) 3509-5555 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=sSUUKxbXnxk> >