Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I will fix it later this week.
On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> wrote: > > > On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> wrote: > >> Thanks for the job id, I looked at the worker logs (following usual >> support oncall access protocol that provides temporary access to things >> like logs in GCP): >> >> Root issue looks like consumerPollLoop() mentioned earlier needs to >> handle unchecked exception. In your case it is clear that poll thread >> exited with a runtime exception. The reader does not check for it and >> continues to wait for poll thread to enqueue messages. A fix should result >> in an IOException for read from the source. The runners will handle that >> appropriately after that. I will file a jira. >> >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >> > > Ignore the link.. was pasted here by mistake. > > >> >> From the logs (with a comment below each one): >> >> - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0 >> starting at offset 2 >> - Implies the reader is initialized and poll thread is started. >> - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2 >> - The reader actually got a message received by the poll thread >> from Kafka. >> - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching >> latest offset for partition kafka_topic-0. will be retried. >> - This must have happened around the time when network was >> disrupted. This is from. Actual log is from another periodic task that >> fetches latest offsets for partitions. >> >> The poll thread must have died around the time network was disrupted. >> >> The following log comes from kafka client itself and is printed every >> second when KafkaIO fetches latest offset. This log seems to be added in >> recent versions. It is probably an unintentional log. I don't think there >> is any better to fetch latest offsets than how KafkaIO does now. This is >> logged inside consumer.position() called at [1]. >> >> - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2, >> groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset >> for partition com.arquivei.dataeng.andre-0 to offset 3. >> >> [1]: >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >> > > This 'Resetting offset' is harmless, but is quite annoying to see in the > worker logs. One way to avoid is to set kafka consumer's log level to > WARNING. Ideally KafkaIO itself should do something to avoid it without > user option. > > > > >> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera < >> eduardo.sold...@arquivei.com.br> wrote: >> >>> Hi Raghu! The job_id of our dev job is >>> 2018-09-12_06_11_48-5600553605191377866. >>> >>> Thanks! >>> >>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com> >>> escreveu: >>> >>>> Thanks for debugging. >>>> Can you provide the job_id of your dev job? The stacktrace shows that >>>> there is no thread running 'consumerPollLoop()' which can explain stuck >>>> reader. You will likely find a logs at line 594 & 587 [1]. Dataflow caches >>>> its readers and DirectRunner may not. That can explain DirectRunner resume >>>> reads. The expectation in KafkaIO is that Kafka client library takes care >>>> of retrying in case of connection problems (as documented). It is possible >>>> that in some cases poll() throws and we need to restart the client in >>>> KafkaIO. >>>> >>>> [1]: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594 >>>> >>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera < >>>> eduardo.sold...@arquivei.com.br> wrote: >>>> >>>>> Hi Raghu, thanks for your help. >>>>> Just answering your previous question, the following logs were the >>>>> same as before the error, as if the pipeline were still getting the >>>>> messages, for example: >>>>> >>>>> (...) >>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>> 10. >>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>> 15. >>>>> ERROR >>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>> 22. >>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>> 30. >>>>> (...) >>>>> >>>>> But when checking the Kafka Consumer Group, the current offset stays >>>>> at 15, the commited offset from the last processed message, before the >>>>> error. >>>>> >>>>> We'll file a bug, but we could now reproduce the issue in a Dev >>>>> scenario. >>>>> We started the same pipeline using the direct runner, without Google >>>>> Dataflow. We blocked the Kafka Broker network and the same error was >>>>> thrown. Then we unblocked the network and the pipeline was able to >>>>> successfully process the subsequent messages. >>>>> When we started the same pipeline in the Dataflow runner and did the >>>>> same test, the same problem from our production scenario happened, >>>>> Dataflow >>>>> couldn't process the new messages. Unfortunately, we've stopped the >>>>> dataflow job in production, but the problematic dev job is still running >>>>> and the log file of the VM is attached. Thank you very much. >>>>> Best regards >>>>> >>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com> >>>>> escreveu: >>>>> >>>>>> Specifically, I am interested if you have any thread running >>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is >>>>>> assigned one of the partitions). It is possible that KafkaClient itself >>>>>> is >>>>>> hasn't recovered from the group coordinator error (though unlikely). >>>>>> >>>>>> >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570 >>>>>> >>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Eduardo, >>>>>>> >>>>>>> In case of any error, the pipeline should keep on trying to fetch. I >>>>>>> don't know about this particular error. Do you see any others >>>>>>> afterwards in >>>>>>> the log? >>>>>>> Couple of things you could try if the logs are not useful : >>>>>>> - login to one of the VMs and get stacktrace of java worker (look >>>>>>> for a container called java-streaming) >>>>>>> - file a support bug or stackoverflow question with jobid so that >>>>>>> Dataflow oncall can take a look. >>>>>>> >>>>>>> Raghu. >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera < >>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using >>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, as >>>>>>>> our >>>>>>>> other workers from other pipelines continued to get Kafka messages. >>>>>>>> >>>>>>>> At the moment it stopped we got these messages: >>>>>>>> >>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Error sending >>>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: >>>>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Group >>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is unavailable >>>>>>>> or invalid, will attempt rediscovery >>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Discovered >>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) >>>>>>>> >>>>>>>> And then the pipeline stopped reading the messages. >>>>>>>> >>>>>>>> This is the KafkaIO setup we have: >>>>>>>> >>>>>>>> KafkaIO.read[String,String]() >>>>>>>> .withBootstrapServers(server) >>>>>>>> .withTopic(topic) >>>>>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>>>>> .updateConsumerProperties(properties) >>>>>>>> .commitOffsetsInFinalize() >>>>>>>> .withoutMetadata() >>>>>>>> >>>>>>>> Any help will be much appreciated. >>>>>>>> >>>>>>>> Best regards, >>>>>>>> -- >>>>>>>> Eduardo Soldera Garcia >>>>>>>> Data Engineer >>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale >>>>>>>> do Silício] >>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Eduardo Soldera Garcia >>>>> Data Engineer >>>>> (16) 3509-5555 | www.arquivei.com.br >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>> Silício] >>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>> <https://www.facebook.com/arquivei> >>>>> <https://www.linkedin.com/company/arquivei> >>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>> >>>> >>> >>> -- >>> Eduardo Soldera Garcia >>> Data Engineer >>> (16) 3509-5555 | www.arquivei.com.br >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>> Silício] >>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>> <https://www.facebook.com/arquivei> >>> <https://www.linkedin.com/company/arquivei> >>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>> >>