Thanks for the job id, I looked at the worker logs (following usual support
oncall access protocol that provides temporary access to things like logs
in GCP):

Root issue looks like consumerPollLoop() mentioned earlier needs to handle
unchecked exception. In your case it is clear that poll thread exited with
a runtime exception. The reader does not check for it and continues to wait
for poll thread to enqueue messages. A fix should result in an IOException
for read from the source. The runners will handle that appropriately after
that.  I will file a jira.
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>From the logs (with a comment below each one):

   - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0
   starting at offset 2
      - Implies the reader is initialized and poll thread is started.
   - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2
      - The reader actually got a message received by the poll thread from
      Kafka.
   - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching latest
   offset for partition kafka_topic-0. will be retried.
      - This must have happened around the time when network was disrupted.
      This is from. Actual log is from another periodic task that
fetches latest
      offsets for partitions.

The poll thread must have died around the time network was disrupted.

The following log comes from kafka client itself and is printed every
second when KafkaIO fetches latest offset. This log seems to be added in
recent versions. It is probably an unintentional log. I don't think there
is any better to fetch latest offsets than how KafkaIO does now. This is
logged inside consumer.position() called at [1].

   - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2,
   groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset
   for partition com.arquivei.dataeng.andre-0 to offset 3.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678

On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera <
eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu! The job_id of our dev job is
> 2018-09-12_06_11_48-5600553605191377866.
>
> Thanks!
>
> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com>
> escreveu:
>
>> Thanks for debugging.
>> Can you provide the job_id of your dev job? The stacktrace shows that
>> there is no thread running 'consumerPollLoop()' which can explain stuck
>> reader. You will likely find a logs at line 594 & 587 [1].  Dataflow caches
>> its readers and DirectRunner may not. That can explain DirectRunner resume
>> reads. The expectation in KafkaIO is that Kafka client library takes care
>> of retrying in case of connection problems (as documented). It is possible
>> that in some cases poll() throws and we need to restart the client in
>> KafkaIO.
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594
>>
>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu, thanks for your help.
>>> Just answering your previous question, the following logs were the same
>>> as before the error, as if the pipeline were still getting the messages,
>>> for example:
>>>
>>> (...)
>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 10.
>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 15.
>>> ERROR
>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 22.
>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 30.
>>> (...)
>>>
>>> But when checking the Kafka Consumer Group, the current offset stays at
>>> 15, the commited offset from the last processed message, before the error.
>>>
>>> We'll file a bug, but we could now reproduce the issue in a Dev scenario.
>>> We started the same pipeline using the direct runner, without Google
>>> Dataflow. We blocked the Kafka Broker network and the same error was
>>> thrown. Then we unblocked the network and the pipeline was able to
>>> successfully process the subsequent messages.
>>> When we started the same pipeline in the Dataflow runner and did the
>>> same test, the same problem from our production scenario happened, Dataflow
>>> couldn't process the new messages. Unfortunately, we've stopped the
>>> dataflow job in production, but the problematic dev job is still running
>>> and the log file of the VM is attached. Thank you very much.
>>> Best regards
>>>
>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com>
>>> escreveu:
>>>
>>>> Specifically, I am interested if you have any thread running
>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is
>>>> assigned one of the partitions). It is possible that KafkaClient itself is
>>>> hasn't recovered from the group coordinator error (though unlikely).
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570
>>>>
>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Eduardo,
>>>>>
>>>>> In case of any error, the pipeline should keep on trying to fetch. I
>>>>> don't know about this particular error. Do you see any others afterwards 
>>>>> in
>>>>> the log?
>>>>> Couple of things you could try if the logs are not useful :
>>>>>  - login to one of the VMs and get stacktrace of java worker (look for
>>>>> a container called java-streaming)
>>>>>  - file a support bug or stackoverflow question with jobid so that
>>>>> Dataflow oncall can take a look.
>>>>>
>>>>> Raghu.
>>>>>
>>>>>
>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera <
>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We have a Apache Beam pipeline running in Google Dataflow using
>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, as 
>>>>>> our
>>>>>> other workers from other pipelines continued to get Kafka messages.
>>>>>>
>>>>>> At the moment it stopped we got these messages:
>>>>>>
>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Error sending 
>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: 
>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Group coordinator 
>>>>>> 10.0.52.70:9093 (id: 2147483646 rack: null) is unavailable or invalid, 
>>>>>> will attempt rediscovery
>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Discovered group 
>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null)
>>>>>>
>>>>>> And then the pipeline stopped reading the messages.
>>>>>>
>>>>>> This is the KafkaIO setup  we have:
>>>>>>
>>>>>> KafkaIO.read[String,String]()
>>>>>>   .withBootstrapServers(server)
>>>>>>   .withTopic(topic)
>>>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>>>   .updateConsumerProperties(properties)
>>>>>>   .commitOffsetsInFinalize()
>>>>>>   .withoutMetadata()
>>>>>>
>>>>>>  Any help will be much appreciated.
>>>>>>
>>>>>> Best regards,
>>>>>> --
>>>>>> Eduardo Soldera Garcia
>>>>>> Data Engineer
>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>>> Silício]
>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>> <https://www.facebook.com/arquivei>
>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>
>>>>>
>>>
>>> --
>>> Eduardo Soldera Garcia
>>> Data Engineer
>>> (16) 3509-5555 | www.arquivei.com.br
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>> Silício]
>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>> <https://www.facebook.com/arquivei>
>>> <https://www.linkedin.com/company/arquivei>
>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>
>>
>
> --
> Eduardo Soldera Garcia
> Data Engineer
> (16) 3509-5555 | www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>

Reply via email to