Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I will
fix it later this week.

On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rang...@google.com> wrote:

>
>
> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rang...@google.com> wrote:
>
>> Thanks for the job id, I looked at the worker logs (following usual
>> support oncall access protocol that provides temporary access to things
>> like logs in GCP):
>>
>> Root issue looks like consumerPollLoop() mentioned earlier needs to
>> handle unchecked exception. In your case it is clear that poll thread
>> exited with a runtime exception. The reader does not check for it and
>> continues to wait for poll thread to enqueue messages. A fix should result
>> in an IOException for read from the source. The runners will handle that
>> appropriately after that.  I will file a jira.
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>
>
> Ignore the link.. was pasted here by mistake.
>
>
>>
>> From the logs (with a comment below each one):
>>
>>    - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0
>>    starting at offset 2
>>       - Implies the reader is initialized and poll thread is started.
>>    - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2
>>       - The reader actually got a message received by the poll thread
>>       from Kafka.
>>    - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching
>>    latest offset for partition kafka_topic-0. will be retried.
>>       - This must have happened around the time when network was
>>       disrupted. This is from. Actual log is from another periodic task that
>>       fetches latest offsets for partitions.
>>
>> The poll thread must have died around the time network was disrupted.
>>
>> The following log comes from kafka client itself and is printed every
>> second when KafkaIO fetches latest offset. This log seems to be added in
>> recent versions. It is probably an unintentional log. I don't think there
>> is any better to fetch latest offsets than how KafkaIO does now. This is
>> logged inside consumer.position() called at [1].
>>
>>    - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2,
>>    groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset
>>    for partition com.arquivei.dataeng.andre-0 to offset 3.
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>
>
> This 'Resetting offset' is harmless, but is quite annoying to see in the
> worker logs. One way to avoid is to set kafka consumer's log level to
> WARNING. Ideally KafkaIO itself should do something to avoid it without
> user option.
>
>
>
>
>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera <
>> eduardo.sold...@arquivei.com.br> wrote:
>>
>>> Hi Raghu! The job_id of our dev job is
>>> 2018-09-12_06_11_48-5600553605191377866.
>>>
>>> Thanks!
>>>
>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rang...@google.com>
>>> escreveu:
>>>
>>>> Thanks for debugging.
>>>> Can you provide the job_id of your dev job? The stacktrace shows that
>>>> there is no thread running 'consumerPollLoop()' which can explain stuck
>>>> reader. You will likely find a logs at line 594 & 587 [1].  Dataflow caches
>>>> its readers and DirectRunner may not. That can explain DirectRunner resume
>>>> reads. The expectation in KafkaIO is that Kafka client library takes care
>>>> of retrying in case of connection problems (as documented). It is possible
>>>> that in some cases poll() throws and we need to restart the client in
>>>> KafkaIO.
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594
>>>>
>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera <
>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>
>>>>> Hi Raghu, thanks for your help.
>>>>> Just answering your previous question, the following logs were the
>>>>> same as before the error, as if the pipeline were still getting the
>>>>> messages, for example:
>>>>>
>>>>> (...)
>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>> 10.
>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>> 15.
>>>>> ERROR
>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>> 22.
>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset
>>>>> 30.
>>>>> (...)
>>>>>
>>>>> But when checking the Kafka Consumer Group, the current offset stays
>>>>> at 15, the commited offset from the last processed message, before the
>>>>> error.
>>>>>
>>>>> We'll file a bug, but we could now reproduce the issue in a Dev
>>>>> scenario.
>>>>> We started the same pipeline using the direct runner, without Google
>>>>> Dataflow. We blocked the Kafka Broker network and the same error was
>>>>> thrown. Then we unblocked the network and the pipeline was able to
>>>>> successfully process the subsequent messages.
>>>>> When we started the same pipeline in the Dataflow runner and did the
>>>>> same test, the same problem from our production scenario happened, 
>>>>> Dataflow
>>>>> couldn't process the new messages. Unfortunately, we've stopped the
>>>>> dataflow job in production, but the problematic dev job is still running
>>>>> and the log file of the VM is attached. Thank you very much.
>>>>> Best regards
>>>>>
>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com>
>>>>> escreveu:
>>>>>
>>>>>> Specifically, I am interested if you have any thread running
>>>>>> 'consumerPollLoop()' [1]. There should always be one (if a worker is
>>>>>> assigned one of the partitions). It is possible that KafkaClient itself 
>>>>>> is
>>>>>> hasn't recovered from the group coordinator error (though unlikely).
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570
>>>>>>
>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Eduardo,
>>>>>>>
>>>>>>> In case of any error, the pipeline should keep on trying to fetch. I
>>>>>>> don't know about this particular error. Do you see any others 
>>>>>>> afterwards in
>>>>>>> the log?
>>>>>>> Couple of things you could try if the logs are not useful :
>>>>>>>  - login to one of the VMs and get stacktrace of java worker (look
>>>>>>> for a container called java-streaming)
>>>>>>>  - file a support bug or stackoverflow question with jobid so that
>>>>>>> Dataflow oncall can take a look.
>>>>>>>
>>>>>>> Raghu.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera <
>>>>>>> eduardo.sold...@arquivei.com.br> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> We have a Apache Beam pipeline running in Google Dataflow using
>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, as 
>>>>>>>> our
>>>>>>>> other workers from other pipelines continued to get Kafka messages.
>>>>>>>>
>>>>>>>> At the moment it stopped we got these messages:
>>>>>>>>
>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Error sending 
>>>>>>>> fetch request (sessionId=1396189203, epoch=2431598) to node 3: 
>>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Group 
>>>>>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is unavailable 
>>>>>>>> or invalid, will attempt rediscovery
>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Discovered 
>>>>>>>> group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null)
>>>>>>>>
>>>>>>>> And then the pipeline stopped reading the messages.
>>>>>>>>
>>>>>>>> This is the KafkaIO setup  we have:
>>>>>>>>
>>>>>>>> KafkaIO.read[String,String]()
>>>>>>>>   .withBootstrapServers(server)
>>>>>>>>   .withTopic(topic)
>>>>>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>>>>>   .updateConsumerProperties(properties)
>>>>>>>>   .commitOffsetsInFinalize()
>>>>>>>>   .withoutMetadata()
>>>>>>>>
>>>>>>>>  Any help will be much appreciated.
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> --
>>>>>>>> Eduardo Soldera Garcia
>>>>>>>> Data Engineer
>>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale
>>>>>>>> do Silício]
>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Eduardo Soldera Garcia
>>>>> Data Engineer
>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>
>>>>
>>>
>>> --
>>> Eduardo Soldera Garcia
>>> Data Engineer
>>> (16) 3509-5555 | www.arquivei.com.br
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>> Silício]
>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>> <https://www.facebook.com/arquivei>
>>> <https://www.linkedin.com/company/arquivei>
>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>
>>

Reply via email to