Fix: https://github.com/apache/beam/pull/6391
On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <rang...@google.com> wrote: > Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I will > fix it later this week. > > On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> wrote: > >> >> >> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> wrote: >> >>> Thanks for the job id, I looked at the worker logs (following usual >>> support oncall access protocol that provides temporary access to things >>> like logs in GCP): >>> >>> Root issue looks like consumerPollLoop() mentioned earlier needs to >>> handle unchecked exception. In your case it is clear that poll thread >>> exited with a runtime exception. The reader does not check for it and >>> continues to wait for poll thread to enqueue messages. A fix should result >>> in an IOException for read from the source. The runners will handle that >>> appropriately after that. I will file a jira. >>> >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>> >> >> Ignore the link.. was pasted here by mistake. >> >> >>> >>> From the logs (with a comment below each one): >>> >>> - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0 >>> starting at offset 2 >>> - Implies the reader is initialized and poll thread is started. >>> - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2 >>> - The reader actually got a message received by the poll thread >>> from Kafka. >>> - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching >>> latest offset for partition kafka_topic-0. will be retried. >>> - This must have happened around the time when network was >>> disrupted. This is from. Actual log is from another periodic task that >>> fetches latest offsets for partitions. >>> >>> The poll thread must have died around the time network was disrupted. >>> >>> The following log comes from kafka client itself and is printed every >>> second when KafkaIO fetches latest offset. This log seems to be added in >>> recent versions. It is probably an unintentional log. I don't think there >>> is any better to fetch latest offsets than how KafkaIO does now. This is >>> logged inside consumer.position() called at [1]. >>> >>> - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2, >>> groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset >>> for partition com.arquivei.dataeng.andre-0 to offset 3. >>> >>> [1]: >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>> >> >> This 'Resetting offset' is harmless, but is quite annoying to see in the >> worker logs. One way to avoid is to set kafka consumer's log level to >> WARNING. Ideally KafkaIO itself should do something to avoid it without >> user option. >> >> >> >> >>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera < >>> eduardo.sold...@arquivei.com.br> wrote: >>> >>>> Hi Raghu! The job_id of our dev job is >>>> 2018-09-12_06_11_48-5600553605191377866. >>>> >>>> Thanks! >>>> >>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com> >>>> escreveu: >>>> >>>>> Thanks for debugging. >>>>> Can you provide the job_id of your dev job? The stacktrace shows that >>>>> there is no thread running 'consumerPollLoop()' which can explain stuck >>>>> reader. You will likely find a logs at line 594 & 587 [1]. Dataflow >>>>> caches >>>>> its readers and DirectRunner may not. That can explain DirectRunner resume >>>>> reads. The expectation in KafkaIO is that Kafka client library takes care >>>>> of retrying in case of connection problems (as documented). It is possible >>>>> that in some cases poll() throws and we need to restart the client in >>>>> KafkaIO. >>>>> >>>>> [1]: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594 >>>>> >>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera < >>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>> >>>>>> Hi Raghu, thanks for your help. >>>>>> Just answering your previous question, the following logs were the >>>>>> same as before the error, as if the pipeline were still getting the >>>>>> messages, for example: >>>>>> >>>>>> (...) >>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>>> 10. >>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>>> 15. >>>>>> ERROR >>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>>> 22. >>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset >>>>>> 30. >>>>>> (...) >>>>>> >>>>>> But when checking the Kafka Consumer Group, the current offset stays >>>>>> at 15, the commited offset from the last processed message, before the >>>>>> error. >>>>>> >>>>>> We'll file a bug, but we could now reproduce the issue in a Dev >>>>>> scenario. >>>>>> We started the same pipeline using the direct runner, without Google >>>>>> Dataflow. We blocked the Kafka Broker network and the same error was >>>>>> thrown. Then we unblocked the network and the pipeline was able to >>>>>> successfully process the subsequent messages. >>>>>> When we started the same pipeline in the Dataflow runner and did the >>>>>> same test, the same problem from our production scenario happened, >>>>>> Dataflow >>>>>> couldn't process the new messages. Unfortunately, we've stopped the >>>>>> dataflow job in production, but the problematic dev job is still running >>>>>> and the log file of the VM is attached. Thank you very much. >>>>>> Best regards >>>>>> >>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com> >>>>>> escreveu: >>>>>> >>>>>>> Specifically, I am interested if you have any thread running >>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is >>>>>>> assigned one of the partitions). It is possible that KafkaClient itself >>>>>>> is >>>>>>> hasn't recovered from the group coordinator error (though unlikely). >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570 >>>>>>> >>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Eduardo, >>>>>>>> >>>>>>>> In case of any error, the pipeline should keep on trying to fetch. >>>>>>>> I don't know about this particular error. Do you see any others >>>>>>>> afterwards >>>>>>>> in the log? >>>>>>>> Couple of things you could try if the logs are not useful : >>>>>>>> - login to one of the VMs and get stacktrace of java worker (look >>>>>>>> for a container called java-streaming) >>>>>>>> - file a support bug or stackoverflow question with jobid so that >>>>>>>> Dataflow oncall can take a look. >>>>>>>> >>>>>>>> Raghu. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera < >>>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using >>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, >>>>>>>>> as our >>>>>>>>> other workers from other pipelines continued to get Kafka messages. >>>>>>>>> >>>>>>>>> At the moment it stopped we got these messages: >>>>>>>>> >>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Error sending >>>>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: >>>>>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Group >>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is >>>>>>>>> unavailable or invalid, will attempt rediscovery >>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Discovered >>>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) >>>>>>>>> >>>>>>>>> And then the pipeline stopped reading the messages. >>>>>>>>> >>>>>>>>> This is the KafkaIO setup we have: >>>>>>>>> >>>>>>>>> KafkaIO.read[String,String]() >>>>>>>>> .withBootstrapServers(server) >>>>>>>>> .withTopic(topic) >>>>>>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>>>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>> .commitOffsetsInFinalize() >>>>>>>>> .withoutMetadata() >>>>>>>>> >>>>>>>>> Any help will be much appreciated. >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> -- >>>>>>>>> Eduardo Soldera Garcia >>>>>>>>> Data Engineer >>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale >>>>>>>>> do Silício] >>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Eduardo Soldera Garcia >>>>>> Data Engineer >>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>> Silício] >>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>> <https://www.facebook.com/arquivei> >>>>>> <https://www.linkedin.com/company/arquivei> >>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>> >>>>> >>>> >>>> -- >>>> Eduardo Soldera Garcia >>>> Data Engineer >>>> (16) 3509-5555 | www.arquivei.com.br >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>> Silício] >>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>> <https://www.facebook.com/arquivei> >>>> <https://www.linkedin.com/company/arquivei> >>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>> >>>