Thanks for debugging.
Can you provide the job_id of your dev job? The stacktrace shows that there
is no thread running 'consumerPollLoop()' which can explain stuck reader.
You will likely find a logs at line 594 & 587 [1].  Dataflow caches its
readers and DirectRunner may not. That can explain DirectRunner resume
reads. The expectation in KafkaIO is that Kafka client library takes care
of retrying in case of connection problems (as documented). It is possible
that in some cases poll() throws and we need to restart the client in
KafkaIO.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594

On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera <
eduardo.sold...@arquivei.com.br> wrote:

> Hi Raghu, thanks for your help.
> Just answering your previous question, the following logs were the same as
> before the error, as if the pipeline were still getting the messages, for
> example:
>
> (...)
> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 10.
> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 15.
> ERROR
> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 22.
> Resetting offset for partition com.arquivei.dataeng.andre-0 to offset 30.
> (...)
>
> But when checking the Kafka Consumer Group, the current offset stays at
> 15, the commited offset from the last processed message, before the error.
>
> We'll file a bug, but we could now reproduce the issue in a Dev scenario.
> We started the same pipeline using the direct runner, without Google
> Dataflow. We blocked the Kafka Broker network and the same error was
> thrown. Then we unblocked the network and the pipeline was able to
> successfully process the subsequent messages.
> When we started the same pipeline in the Dataflow runner and did the same
> test, the same problem from our production scenario happened, Dataflow
> couldn't process the new messages. Unfortunately, we've stopped the
> dataflow job in production, but the problematic dev job is still running
> and the log file of the VM is attached. Thank you very much.
> Best regards
>
> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rang...@google.com>
> escreveu:
>
>> Specifically, I am interested if you have any thread running
>> 'consumerPollLoop()' [1]. There should always be one (if a worker is
>> assigned one of the partitions). It is possible that KafkaClient itself is
>> hasn't recovered from the group coordinator error (though unlikely).
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570
>>
>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rang...@google.com> wrote:
>>
>>> Hi Eduardo,
>>>
>>> In case of any error, the pipeline should keep on trying to fetch. I
>>> don't know about this particular error. Do you see any others afterwards in
>>> the log?
>>> Couple of things you could try if the logs are not useful :
>>>  - login to one of the VMs and get stacktrace of java worker (look for a
>>> container called java-streaming)
>>>  - file a support bug or stackoverflow question with jobid so that
>>> Dataflow oncall can take a look.
>>>
>>> Raghu.
>>>
>>>
>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera <
>>> eduardo.sold...@arquivei.com.br> wrote:
>>>
>>>> Hi,
>>>> We have a Apache Beam pipeline running in Google Dataflow using
>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka messages at all, as our
>>>> other workers from other pipelines continued to get Kafka messages.
>>>>
>>>> At the moment it stopped we got these messages:
>>>>
>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Error sending fetch 
>>>> request (sessionId=1396189203, epoch=2431598) to node 3: 
>>>> org.apache.kafka.common.errors.DisconnectException.
>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Group coordinator 
>>>> 10.0.52.70:9093 (id: 2147483646 rack: null) is unavailable or invalid, 
>>>> will attempt rediscovery
>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe] Discovered group 
>>>> coordinator 10.0.52.70:9093 (id: 2147483646 rack: null)
>>>>
>>>> And then the pipeline stopped reading the messages.
>>>>
>>>> This is the KafkaIO setup  we have:
>>>>
>>>> KafkaIO.read[String,String]()
>>>>   .withBootstrapServers(server)
>>>>   .withTopic(topic)
>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>   .updateConsumerProperties(properties)
>>>>   .commitOffsetsInFinalize()
>>>>   .withoutMetadata()
>>>>
>>>>  Any help will be much appreciated.
>>>>
>>>> Best regards,
>>>> --
>>>> Eduardo Soldera Garcia
>>>> Data Engineer
>>>> (16) 3509-5555 | www.arquivei.com.br
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>> Silício]
>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>> <https://www.facebook.com/arquivei>
>>>> <https://www.linkedin.com/company/arquivei>
>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>
>>>
>
> --
> Eduardo Soldera Garcia
> Data Engineer
> (16) 3509-5555 | www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>

Reply via email to