Is is feasible for you to verify the fix in your dev job? I can make a patch against Beam 2.4 branch if you like.
Raghu. On Fri, Sep 14, 2018 at 11:14 AM Eduardo Soldera < eduardo.sold...@arquivei.com.br> wrote: > Hi Raghu, thank you very much for the pull request. > We'll wait for the 2.7 Beam release. > > Regards! > > Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <rang...@google.com> > escreveu: > >> Fix: https://github.com/apache/beam/pull/6391 >> >> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <rang...@google.com> wrote: >> >>> Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I >>> will fix it later this week. >>> >>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> >>> wrote: >>> >>>> >>>> >>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> >>>> wrote: >>>> >>>>> Thanks for the job id, I looked at the worker logs (following usual >>>>> support oncall access protocol that provides temporary access to things >>>>> like logs in GCP): >>>>> >>>>> Root issue looks like consumerPollLoop() mentioned earlier needs to >>>>> handle unchecked exception. In your case it is clear that poll thread >>>>> exited with a runtime exception. The reader does not check for it and >>>>> continues to wait for poll thread to enqueue messages. A fix should result >>>>> in an IOException for read from the source. The runners will handle that >>>>> appropriately after that. I will file a jira. >>>>> >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>>> >>>> >>>> Ignore the link.. was pasted here by mistake. >>>> >>>> >>>>> >>>>> From the logs (with a comment below each one): >>>>> >>>>> - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0 >>>>> starting at offset 2 >>>>> - Implies the reader is initialized and poll thread is started. >>>>> - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2 >>>>> - The reader actually got a message received by the poll thread >>>>> from Kafka. >>>>> - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching >>>>> latest offset for partition kafka_topic-0. will be retried. >>>>> - This must have happened around the time when network was >>>>> disrupted. This is from. Actual log is from another periodic task >>>>> that >>>>> fetches latest offsets for partitions. >>>>> >>>>> The poll thread must have died around the time network was disrupted. >>>>> >>>>> The following log comes from kafka client itself and is printed every >>>>> second when KafkaIO fetches latest offset. This log seems to be added in >>>>> recent versions. It is probably an unintentional log. I don't think there >>>>> is any better to fetch latest offsets than how KafkaIO does now. This is >>>>> logged inside consumer.position() called at [1]. >>>>> >>>>> - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2, >>>>> groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting >>>>> offset >>>>> for partition com.arquivei.dataeng.andre-0 to offset 3. >>>>> >>>>> [1]: >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678 >>>>> >>>> >>>> This 'Resetting offset' is harmless, but is quite annoying to see in >>>> the worker logs. One way to avoid is to set kafka consumer's log level to >>>> WARNING. Ideally KafkaIO itself should do something to avoid it without >>>> user option. >>>> >>>> >>>> >>>> >>>>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera < >>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>> >>>>>> Hi Raghu! The job_id of our dev job is >>>>>> 2018-09-12_06_11_48-5600553605191377866. >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com> >>>>>> escreveu: >>>>>> >>>>>>> Thanks for debugging. >>>>>>> Can you provide the job_id of your dev job? The stacktrace shows >>>>>>> that there is no thread running 'consumerPollLoop()' which can explain >>>>>>> stuck reader. You will likely find a logs at line 594 & 587 [1]. >>>>>>> Dataflow >>>>>>> caches its readers and DirectRunner may not. That can explain >>>>>>> DirectRunner >>>>>>> resume reads. The expectation in KafkaIO is that Kafka client library >>>>>>> takes >>>>>>> care of retrying in case of connection problems (as documented). It is >>>>>>> possible that in some cases poll() throws and we need to restart the >>>>>>> client >>>>>>> in KafkaIO. >>>>>>> >>>>>>> [1]: >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594 >>>>>>> >>>>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera < >>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>> >>>>>>>> Hi Raghu, thanks for your help. >>>>>>>> Just answering your previous question, the following logs were the >>>>>>>> same as before the error, as if the pipeline were still getting the >>>>>>>> messages, for example: >>>>>>>> >>>>>>>> (...) >>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>> offset 10. >>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>> offset 15. >>>>>>>> ERROR >>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>> offset 22. >>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to >>>>>>>> offset 30. >>>>>>>> (...) >>>>>>>> >>>>>>>> But when checking the Kafka Consumer Group, the current offset >>>>>>>> stays at 15, the commited offset from the last processed message, >>>>>>>> before >>>>>>>> the error. >>>>>>>> >>>>>>>> We'll file a bug, but we could now reproduce the issue in a Dev >>>>>>>> scenario. >>>>>>>> We started the same pipeline using the direct runner, without >>>>>>>> Google Dataflow. We blocked the Kafka Broker network and the same >>>>>>>> error was >>>>>>>> thrown. Then we unblocked the network and the pipeline was able to >>>>>>>> successfully process the subsequent messages. >>>>>>>> When we started the same pipeline in the Dataflow runner and did >>>>>>>> the same test, the same problem from our production scenario happened, >>>>>>>> Dataflow couldn't process the new messages. Unfortunately, we've >>>>>>>> stopped >>>>>>>> the dataflow job in production, but the problematic dev job is still >>>>>>>> running and the log file of the VM is attached. Thank you very much. >>>>>>>> Best regards >>>>>>>> >>>>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi < >>>>>>>> rang...@google.com> escreveu: >>>>>>>> >>>>>>>>> Specifically, I am interested if you have any thread running >>>>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is >>>>>>>>> assigned one of the partitions). It is possible that KafkaClient >>>>>>>>> itself is >>>>>>>>> hasn't recovered from the group coordinator error (though unlikely). >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570 >>>>>>>>> >>>>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Eduardo, >>>>>>>>>> >>>>>>>>>> In case of any error, the pipeline should keep on trying to >>>>>>>>>> fetch. I don't know about this particular error. Do you see any >>>>>>>>>> others >>>>>>>>>> afterwards in the log? >>>>>>>>>> Couple of things you could try if the logs are not useful : >>>>>>>>>> - login to one of the VMs and get stacktrace of java worker >>>>>>>>>> (look for a container called java-streaming) >>>>>>>>>> - file a support bug or stackoverflow question with jobid so >>>>>>>>>> that Dataflow oncall can take a look. >>>>>>>>>> >>>>>>>>>> Raghu. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera < >>>>>>>>>> eduardo.sold...@arquivei.com.br> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using >>>>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, >>>>>>>>>>> as our >>>>>>>>>>> other workers from other pipelines continued to get Kafka messages. >>>>>>>>>>> >>>>>>>>>>> At the moment it stopped we got these messages: >>>>>>>>>>> >>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Error >>>>>>>>>>> sending fetch request (sessionId=1396189203, epoch=2431598) to node >>>>>>>>>>> 3: org.apache.kafka.common.errors.DisconnectException. >>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Group >>>>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is >>>>>>>>>>> unavailable or invalid, will attempt rediscovery >>>>>>>>>>> I [Consumer clientId=consumer-1, groupId=genericPipe] Discovered >>>>>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) >>>>>>>>>>> >>>>>>>>>>> And then the pipeline stopped reading the messages. >>>>>>>>>>> >>>>>>>>>>> This is the KafkaIO setup we have: >>>>>>>>>>> >>>>>>>>>>> KafkaIO.read[String,String]() >>>>>>>>>>> .withBootstrapServers(server) >>>>>>>>>>> .withTopic(topic) >>>>>>>>>>> .withKeyDeserializer(classOf[StringDeserializer]) >>>>>>>>>>> .withValueDeserializer(classOf[StringDeserializer]) >>>>>>>>>>> .updateConsumerProperties(properties) >>>>>>>>>>> .commitOffsetsInFinalize() >>>>>>>>>>> .withoutMetadata() >>>>>>>>>>> >>>>>>>>>>> Any help will be much appreciated. >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> -- >>>>>>>>>>> Eduardo Soldera Garcia >>>>>>>>>>> Data Engineer >>>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no >>>>>>>>>>> Vale do Silício] >>>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Eduardo Soldera Garcia >>>>>>>> Data Engineer >>>>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale >>>>>>>> do Silício] >>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>>>> <https://www.facebook.com/arquivei> >>>>>>>> <https://www.linkedin.com/company/arquivei> >>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Eduardo Soldera Garcia >>>>>> Data Engineer >>>>>> (16) 3509-5555 | www.arquivei.com.br >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais] >>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> >>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do >>>>>> Silício] >>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> >>>>>> <https://www.facebook.com/arquivei> >>>>>> <https://www.linkedin.com/company/arquivei> >>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk> >>>>>> >>>>> > > -- > Eduardo Soldera Garcia > Data Engineer > (16) 3509-5555 | www.arquivei.com.br > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Arquivei.com.br – Inteligência em Notas Fiscais] > <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura> > [image: Google seleciona Arquivei para imersão e mentoria no Vale do > Silício] > <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad> > <https://www.facebook.com/arquivei> > <https://www.linkedin.com/company/arquivei> > <https://www.youtube.com/watch?v=sSUUKxbXnxk> >