[ 
https://issues.apache.org/jira/browse/DRILL-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16604682#comment-16604682
 ] 

Abhishek Ravi edited comment on DRILL-6625 at 9/5/18 5:11 PM:
--------------------------------------------------------------

Had some time to dig further into this issue. [~ben-zvi] - mentioned that the 
issues are seen when running on Mac and they seem to be intermittent failures.
h3. h3. Issue 1 - "{{The server disconnected before a response was received"}}

In {{KafkaMessageGenerator}}, the following properties are set when creating a 
producer.
{code:java}
producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);

{code}
Consider slower systems or heavily loaded system, where the acknowledgement for 
the message produced did not arrive in 1000ms - since {{RETRIES_CONFIG}} is set 
to 0, produce will fail with {{org.apache.kafka.common.errors.NetworkException: 
The server disconnected before a response was received.}}

Although, the unit tests never failed in my environment, I was able to 
reproduce this error by reducing the value for {{REQUEST_TIMEOUT_MS_CONFIG}} to 
as low as *50 ms*.
{noformat}
23:19:55.136 [main] ERROR o.a.d.e.s.k.KafkaMessageGenerator - 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.drill.exec.store.kafka.KafkaMessageGenerator.populateJsonMsgIntoKafka(KafkaMessageGenerator.java:126)
 ~[test-classes/:na]
 at 
org.apache.drill.exec.store.kafka.TestKafkaSuit.initKafka(TestKafkaSuit.java:88)
 [test-classes/:na]

{noformat}
 
h3. h3. ISSUE 2 - {{Failed to fetch messages within 200 milliseconds}}

Again, this issue may occur in slower systems or systems under heavy load where 
a consumer poll did not return messages within {{Poll Timeout}} set.

For unit tests, {{KAFKA_POLL_TIMEOUT}} is set to *200 ms*.
{code:java}
testNoResult(String.format("alter session set `%s` = %d", 
ExecConstants.KAFKA_POLL_TIMEOUT, 200));

{code}
If a poll does not return a message within this time then following exception 
is thrown.

{{org.apache.drill.exec.rpc.RpcException: 
org.apache.drill.common.exceptions.UserRemoteException: DATA_READ ERROR: Failed 
to fetch messages within 10 milliseconds. Consider increasing the value of the 
property : store.kafka.poll.timeout}}

 

I was able to reproduce this issue by reducing {{KAFKA_POLL_TIMEOUT}} value to 
as low as *50 ms.*

 
h3. h3. Solution
 * The value for producer {{REQUEST_TIMEOUT_MS_CONFIG}} should be increased (to 
say 10s) and similarly the value for consumer {{KAFKA_POLL_TIMEOUT}} should 
also be increased.
 * We should increase producer {{RETRIES_CONFIG}} to allow retries. We can 
eliminate the possibility of having duplicate messages by using *{{Idempotent 
Producer}}*{{.}}

 

[~akumarb2010], [~kam_iitkgp], [~kkhatua] - any other suggestions?


was (Author: aravi5):
Had some time to dig further into this issue. [~ben-zvi] - mentioned that the 
issues are seen when running on Mac and they seem to be intermittent failures.
h3. h3. Issue 1 - "{{The server disconnected before a response was received"}}

In {{KafkaMessageGenerator}}, the following properties are set when creating a 
producer.

{code}

producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);

{code}

Consider slower systems or heavily loaded system, where the acknowledgement for 
the message produced did not arrive in 1000ms - since {{RETRIES_CONFIG}} is set 
to 0, produce will fail with {{org.apache.kafka.common.errors.NetworkException: 
The server disconnected before a response was received.}}

Although, the unit tests never failed in my environment, I was able to 
reproduce this error by reducing the value for {{REQUEST_TIMEOUT_MS_CONFIG}} to 
as low as *50 ms*.

{noformat}

23:19:55.136 [main] ERROR o.a.d.e.s.k.KafkaMessageGenerator - 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
 ~[kafka-clients-0.11.0.1.jar:na]
 at 
org.apache.drill.exec.store.kafka.KafkaMessageGenerator.populateJsonMsgIntoKafka(KafkaMessageGenerator.java:126)
 ~[test-classes/:na]
 at 
org.apache.drill.exec.store.kafka.TestKafkaSuit.initKafka(TestKafkaSuit.java:88)
 [test-classes/:na]

{noformat}

 
h3. h3. ISSUE 2 - {{Failed to fetch messages within 200 milliseconds}}

Again, this issue may occur in slower systems or systems under heavy load where 
a consumer poll did not return messages within {{Poll Timeout}} set.

For unit tests, {{KAFKA_POLL_TIMEOUT}} is set to *200 ms*.

{code}

testNoResult(String.format("alter session set `%s` = %d", 
ExecConstants.KAFKA_POLL_TIMEOUT, 200));

{code}

If a poll does not return a message within this time then following exception 
is thrown.

{{org.apache.drill.exec.rpc.RpcException: 
org.apache.drill.common.exceptions.UserRemoteException: DATA_READ ERROR: Failed 
to fetch messages within 10 milliseconds. Consider increasing the value of the 
property : store.kafka.poll.timeout}}

 

I was able to reproduce this issue by reducing {{KAFKA_POLL_TIMEOUT}} value to 
as low as *50 ms.*

 
h3. h3. Solution
 * The value for producer {{REQUEST_TIMEOUT_MS_CONFIG}} should be increased (to 
say 10s) and similarly the value for consumer {{KAFKA_POLL_TIMEOUT}} should 
also be increased.
 * We should increase producer {{RETRIES_CONFIG}} to allow retries. We can 
eliminate the possibility of having duplicate messages by using *{{Idempotent 
Producer}}*{{.}}

 

[~akumarb2010], [~kkhatua] - any thoughts?

> Intermittent failures in Kafka unit tests
> -----------------------------------------
>
>                 Key: DRILL-6625
>                 URL: https://issues.apache.org/jira/browse/DRILL-6625
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Storage - Other
>    Affects Versions: 1.13.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.15.0
>
>
> The following failures have been seen (consistently on my Mac, or 
> occasionally on Jenkins) when running the unit tests, in the Kafka test suit. 
> After the failure, maven hangs for a long time.
>  Cost was 0.0 (instead of 26.0) :
> {code:java}
> Running org.apache.drill.exec.store.kafka.KafkaFilterPushdownTest
> 16:46:57.748 [main] ERROR org.apache.drill.TestReporter - Test Failed (d: 
> -65.3 KiB(73.6 KiB), h: -573.5 MiB(379.5 MiB), nh: 1.2 MiB(117.1 MiB)): 
> testPushdownWithOr(org.apache.drill.exec.store.kafka.KafkaFilterPushdownTest)
> java.lang.AssertionError: Unable to find expected string     "kafkaScanSpec" 
> : {
>       "topicName" : "drill-pushdown-topic"
>     },
>     "cost" : 26.0 in plan: {
>   "head" : {
>     "version" : 1,
>     "generator" : {
>       "type" : "ExplainHandler",
>       "info" : ""
>     },
>     "type" : "APACHE_DRILL_PHYSICAL",
>     "options" : [ {
>       "kind" : "STRING",
>       "accessibleScopes" : "ALL",
>       "name" : "store.kafka.record.reader",
>       "string_val" : 
> "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
>       "scope" : "SESSION"
>     }, {
>       "kind" : "LONG",
>       "accessibleScopes" : "ALL",
>       "name" : "planner.width.max_per_node",
>       "num_val" : 2,
>       "scope" : "SESSION"
>     }, {
>       "kind" : "BOOLEAN",
>       "accessibleScopes" : "ALL",
>       "name" : "exec.errors.verbose",
>       "bool_val" : true,
>       "scope" : "SESSION"
>     }, {
>       "kind" : "LONG",
>       "accessibleScopes" : "ALL",
>       "name" : "store.kafka.poll.timeout",
>       "num_val" : 200,
>       "scope" : "SESSION"
>     } ],
>     "queue" : 0,
>     "hasResourcePlan" : false,
>     "resultMode" : "EXEC"
>   },
>   "graph" : [ {
>     "pop" : "kafka-scan",
>     "@id" : 6,
>     "userName" : "",
>     "kafkaStoragePluginConfig" : {
>       "type" : "kafka",
>       "kafkaConsumerProps" : {
>         "bootstrap.servers" : "127.0.0.1:63751",
>         "group.id" : "drill-test-consumer"
>       },
>       "enabled" : true
>     },
>     "columns" : [ "`**`" ],
>     "kafkaScanSpec" : {
>       "topicName" : "drill-pushdown-topic"
>     },
>     "cost" : 0.0
>   }, {
> {code}
> Or occasionally:
> {code}
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> 11:52:57.571 [main] ERROR o.a.d.e.s.k.KafkaMessageGenerator - 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to