Hi Raghu, thank you very much for the pull request. We'll wait for the 2.7 Beam release.
Regards! Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <[email protected]> escreveu: > Fix: https://github.com/apache/beam/pull/6391 > > On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <[email protected]> wrote: > >> Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I >> will fix it later this week. >> >> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <[email protected]> wrote: >> >>> >>> >>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <[email protected]> >>> wrote: >>> >>>> Thanks for the job id, I looked at the worker logs (following usual >>>> support oncall access protocol that provides temporary access to things >>>> like logs in GCP): >>>> >>>> Root issue looks like consumerPollLoop() mentioned earlier needs to >>>> handle unchecked exception. In your case it is clear that poll thread >>>> exited with a runtime exception. The reader does not check for it and >>>> continues to wait for poll thread to enqueue messages. A fix should result >>>> in an IOException for read from the source. The runners will handle that >>>> appropriately after that. I will file a jira. >>>> >>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>> >>> >>> Ignore the link.. was pasted here by mistake. >>> >>> >>>> >>>> From the logs (with a comment below each one): >>>> >>>> - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0 >>>> starting at offset 2 >>>> - Implies the reader is initialized and poll thread is started. >>>> - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2 >>>> - The reader actually got a message received by the poll thread >>>> from Kafka. >>>> - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching >>>> latest offset for partition kafka_topic-0. will be retried. >>>> - This must have happened around the time when network was >>>> disrupted. This is from. Actual log is from another periodic task >>>> that >>>> fetches latest offsets for partitions. >>>> >>>> The poll thread must have died around the time network was disrupted. >>>> >>>> The following log comes from kafka client itself and is printed every >>>> second when KafkaIO fetches latest offset. This log seems to be added in >>>> recent versions. It is probably an unintentional log. I don't think there >>>> is any better to fetch latest offsets than how KafkaIO does now. This is >>>> logged inside consumer.position() called at [1]. >>>> >>>> - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2, >>>> groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting >>>> offset >>>> for partition com.arquivei.dataeng.andre-0 to offset 3. >>>> >>>> [1]: >>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>> >>> >>> This 'Resetting offset' is harmless, but is quite annoying to see in the >>> worker logs. One way to avoid is to set kafka consumer's log level to >>> WARNING. Ideally KafkaIO itself should do something to avoid it without >>> user option. >>> >>> >>> >>> >>>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera < >>>> [email protected]> wrote: >>>> >>>>> Hi Raghu! The job_id of our dev job is >>>>> 2018-09-12_06_11_48-5600553605191377866. >>>>> >>>>> Thanks! >>>>> >>>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <[email protected]> >>>>> escreveu: >>>>> >>>>>> Thanks for debugging. >>>>>> Can you provide the job_id of your dev job? The stacktrace shows that >>>>>> there is no thread running 'consumerPollLoop()' which can explain stuck >>>>>> reader. You will likely find a logs at line 594 & 587 [1]. Dataflow >>>>>> caches >>>>>> its readers and DirectRunner may not. That can explain DirectRunner >>>>>> resume >>>>>> reads. The expectation in KafkaIO is that Kafka client library takes care >>>>>> of retrying in case of connection problems (as documented). It is >>>>>> possible >>>>>> that in some cases poll() throws and we need to restart the client in >>>>>> KafkaIO. >>>>>> >>>>>> [1]: >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594 >>>>>> >>>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Raghu, thanks for your help. >>>>>>> Just answering your previous question, the following logs were the >>>>>>> same as before the error, as if the pipeline were still getting the >>>>>>> messages, for example: >>>>>>> >>>>>>> (...) >>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>> offset 10. >>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>> offset 15. >>>>>>> ERROR >>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>> offset 22. >>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>> offset 30. >>>>>>> (...) >>>>>>> >>>>>>> But when checking the Kafka Consumer Group, the current offset stays >>>>>>> at 15, the commited offset from the last processed message, before the >>>>>>> error. >>>>>>> >>>>>>> We'll file a bug, but we could now reproduce the issue in a Dev >>>>>>> scenario. >>>>>>> We started the same pipeline using the direct runner, without Google >>>>>>> Dataflow. We blocked the Kafka Broker network and the same error was >>>>>>> thrown. Then we unblocked the network and the pipeline was able to >>>>>>> successfully process the subsequent messages. >>>>>>> When we started the same pipeline in the Dataflow runner and did the >>>>>>> same test, the same problem from our production scenario happened, >>>>>>> Dataflow >>>>>>> couldn't process the new messages. Unfortunately, we've stopped the >>>>>>> dataflow job in production, but the problematic dev job is still running >>>>>>> and the log file of the VM is attached. Thank you very much. >>>>>>> Best regards >>>>>>> >>>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <[email protected]> >>>>>>> escreveu: >>>>>>> >>>>>>>> Specifically, I am interested if you have any thread running >>>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is >>>>>>>> assigned one of the partitions). It is possible that KafkaClient >>>>>>>> itself is >>>>>>>> hasn't recovered from the group coordinator error (though unlikely). >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570 >>>>>>>> >>>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Eduardo, >>>>>>>>> >>>>>>>>> In case of any error, the pipeline should keep on trying to fetch. >>>>>>>>> I don't know about this particular error. Do you see any others >>>>>>>>> afterwards >>>>>>>>> in the log? >>>>>>>>> Couple of things you could try if the logs are not useful : >>>>>>>>> - login to one of the VMs and get stacktrace of java worker (look >>>>>>>>> for a container called java-streaming) >>>>>>>>> - file a support bug or stackoverflow question with jobid so that >>>>>>>>> Dataflow oncall can take a look. >>>>>>>>> >>>>>>>>> Raghu. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using >>>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, >>>>>>>>>> as our >>>>>>>>>> other workers from other pipelines continued to get Kafka messages. >>>>>>>>>> >>>>>>>>>> At the moment it stopped we got these messages: >>>>>>>>>> >>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Error sending >>>>>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: >>>>>>>>>> org.apache.kafka.common.errors.DisconnectException. >>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Group >>>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is >>>>>>>>>> unavailable or invalid, will attempt rediscovery >>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Discovered >>>>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) >>>>>>>>>> >>>>>>>>>> And then the pipeline stopped reading the messages. >>>>>>>>>> >>>>>>>>>> This is the KafkaIO setup we have: >>>>>>>>>> >>>>>>>>>> KafkaIO.read[String,String]() >>>>>>>>>> .withBootstrapServers(server) >>>>>>>>>> .withTopic(topic) >>>>>>>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>>>>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>>> .commitOffsetsInFinalize() >>>>>>>>>> .withoutMetadata() >>>>>>>>>> >>>>>>>>>> Any help will be much appreciated. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> -- >>>>>>>>>> Eduardo Soldera Garcia >>>>>>>>>> Data Engineer >>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale >>>>>>>>>> do Silício] >>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Eduardo Soldera Garcia >>>>>>> Data Engineer >>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>>> Silício] >>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>> <https://www.facebook.com/arquivei> >>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Eduardo Soldera Garcia >>>>> Data Engineer >>>>> (16) 3509-5555 | www.arquivei.com.br >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>> Silício] >>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>> <https://www.facebook.com/arquivei> >>>>> <https://www.linkedin.com/company/arquivei> >>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>> >>>> -- Eduardo Soldera Garcia Data Engineer (16) 3509-5555 | www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Arquivei.com.br – Inteligência em Notas Fiscais] <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> [image: Google seleciona Arquivei para imersão e mentoria no Vale do Silício] <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> <https://www.facebook.com/arquivei> <https://www.linkedin.com/company/arquivei> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
