Fix: https://github.com/apache/beam/pull/6391

On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <rang...@google.com> wrote:

> Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I will
> fix it later this week.
>
> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> wrote:
>
>>
>>
>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> wrote:
>>
>>> Thanks for the job id, I looked at the worker logs (following usual
>>> support oncall access protocol that provides temporary access to things
>>> like logs in GCP):
>>>
>>> Root issue looks like consumerPollLoop() mentioned earlier needs to
>>> handle unchecked exception. In your case it is clear that poll thread
>>> exited with a runtime exception. The reader does not check for it and
>>> continues to wait for poll thread to enqueue messages. A fix should result
>>> in an IOException for read from the source. The runners will handle that
>>> appropriately after that.  I will file a jira.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>>
>>
>> Ignore the link.. was pasted here by mistake.
>>
>>
>>>
>>> From the logs (with a comment below each one):
>>>
>>>    - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0
>>>    starting at offset 2
>>>       - Implies the reader is initialized and poll thread is started.
>>>    - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2
>>>       - The reader actually got a message received by the poll thread
>>>       from Kafka.
>>>    - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching
>>>    latest offset for partition kafka_topic-0. will be retried.
>>>       - This must have happened around the time when network was
>>>       disrupted. This is from. Actual log is from another periodic task that
>>>       fetches latest offsets for partitions.
>>>
>>> The poll thread must have died around the time network was disrupted.
>>>
>>> The following log comes from kafka client itself and is printed every
>>> second when KafkaIO fetches latest offset. This log seems to be added in
>>> recent versions. It is probably an unintentional log. I don't think there
>>> is any better to fetch latest offsets than how KafkaIO does now. This is
>>> logged inside consumer.position() called at [1].
>>>
>>>    - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2,
>>>    groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset
>>>    for partition com.arquivei.dataeng.andre-0 to offset 3.
>>>
>>> [1]:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>>
>>
>> This 'Resetting offset' is harmless, but is quite annoying to see in the
>> worker logs. One way to avoid is to set kafka consumer's log level to
>> WARNING. Ideally KafkaIO itself should do something to avoid it without
>> user option.
>>
>>
>>
>>
>>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
>>>> Hi Raghu! The job_id of our dev job is
>>>> 2018-09-12_06_11_48-5600553605191377866.
>>>>
>>>> Thanks!
>>>>
>>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com>
>>>> escreveu:
>>>>
>>>>> Thanks for debugging.
>>>>> Can you provide the job_id of your dev job? The stacktrace shows that
>>>>> there is no thread running 'consumerPollLoop()' which can explain stuck
>>>>> reader. You will likely find a logs at line 594 & 587 [1].  Dataflow 
>>>>> caches
>>>>> its readers and DirectRunner may not. That can explain DirectRunner resume
>>>>> reads. The expectation in KafkaIO is that Kafka client library takes care
>>>>> of retrying in case of connection problems (as documented). It is possible
>>>>> that in some cases poll() throws and we need to restart the client in
>>>>> KafkaIO.
>>>>>
>>>>> [1]:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594
>>>>>
>>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera <
>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>
>>>>>> Hi Raghu, thanks for your help.
>>>>>> Just answering your previous question, the following logs were the
>>>>>> same as before the error, as if the pipeline were still getting the
>>>>>> messages, for example:
>>>>>>
>>>>>> (...)
>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>>> 10.
>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>>> 15.
>>>>>> ERROR
>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>>> 22.
>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>>> 30.
>>>>>> (...)
>>>>>>
>>>>>> But when checking the Kafka Consumer Group, the current offset stays
>>>>>> at 15, the commited offset from the last processed message, before the
>>>>>> error.
>>>>>>
>>>>>> We'll file a bug, but we could now reproduce the issue in a Dev
>>>>>> scenario.
>>>>>> We started the same pipeline using the direct runner, without Google
>>>>>> Dataflow. We blocked the Kafka Broker network and the same error was
>>>>>> thrown. Then we unblocked the network and the pipeline was able to
>>>>>> successfully process the subsequent messages.
>>>>>> When we started the same pipeline in the Dataflow runner and did the
>>>>>> same test, the same problem from our production scenario happened, 
>>>>>> Dataflow
>>>>>> couldn't process the new messages. Unfortunately, we've stopped the
>>>>>> dataflow job in production, but the problematic dev job is still running
>>>>>> and the log file of the VM is attached. Thank you very much.
>>>>>> Best regards
>>>>>>
>>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com>
>>>>>> escreveu:
>>>>>>
>>>>>>> Specifically, I am interested if you have any thread running
>>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is
>>>>>>> assigned one of the partitions). It is possible that KafkaClient itself 
>>>>>>> is
>>>>>>> hasn't recovered from the group coordinator error (though unlikely).
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570
>>>>>>>
>>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Eduardo,
>>>>>>>>
>>>>>>>> In case of any error, the pipeline should keep on trying to fetch.
>>>>>>>> I don't know about this particular error. Do you see any others 
>>>>>>>> afterwards
>>>>>>>> in the log?
>>>>>>>> Couple of things you could try if the logs are not useful :
>>>>>>>>  - login to one of the VMs and get stacktrace of java worker (look
>>>>>>>> for a container called java-streaming)
>>>>>>>>  - file a support bug or stackoverflow question with jobid so that
>>>>>>>> Dataflow oncall can take a look.
>>>>>>>>
>>>>>>>> Raghu.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera <
>>>>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using
>>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, 
>>>>>>>>> as our
>>>>>>>>> other workers from other pipelines continued to get Kafka messages.
>>>>>>>>>
>>>>>>>>> At the moment it stopped we got these messages:
>>>>>>>>>
>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Error sending 
>>>>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: 
>>>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Group 
>>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is 
>>>>>>>>> unavailable or invalid, will attempt rediscovery
>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Discovered 
>>>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null)
>>>>>>>>>
>>>>>>>>> And then the pipeline stopped reading the messages.
>>>>>>>>>
>>>>>>>>> This is the KafkaIO setup  we have:
>>>>>>>>>
>>>>>>>>> KafkaIO.read[String,String]()
>>>>>>>>>   .withBootstrapServers(server)
>>>>>>>>>   .withTopic(topic)
>>>>>>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>>>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>>>>>>   .updateConsumerProperties(properties)
>>>>>>>>>   .commitOffsetsInFinalize()
>>>>>>>>>   .withoutMetadata()
>>>>>>>>>
>>>>>>>>>  Any help will be much appreciated.
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> --
>>>>>>>>> Eduardo Soldera Garcia
>>>>>>>>> Data Engineer
>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale
>>>>>>>>> do Silício]
>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eduardo Soldera Garcia
>>>>>> Data Engineer
>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>>> Silício]
>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>> <https://www.facebook.com/arquivei>
>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Eduardo Soldera Garcia
>>>> Data Engineer
>>>> (16) 3509-5555 | www.arquivei.com.br
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>> Silício]
>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>> <https://www.facebook.com/arquivei>
>>>> <https://www.linkedin.com/company/arquivei>
>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>
>>>

Reply via email to