[ 
https://issues.apache.org/jira/browse/SPARK-20780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jayadeepj updated SPARK-20780:
------------------------------
    Attachment: streaming_1.png
                streaming_2.png
                tasks_timing_out_3.png

> Spark Kafka10 Consumer Hangs
> ----------------------------
>
>                 Key: SPARK-20780
>                 URL: https://issues.apache.org/jira/browse/SPARK-20780
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0
> Spark Streaming Kafka 010
> CDH 5.8.4
> CentOS Linux release 7.2
>            Reporter: jayadeepj
>            Priority: Critical
>         Attachments: streaming_1.png, streaming_2.png, tasks_timing_out_3.png
>
>
> We have recently upgraded our Streaming App with Direct Stream to Spark 2 
> (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) & Consumer 
> 10 . We find abnormal delays after the application has run for a couple of 
> hours or completed consumption of approx. ~ 5 million records.
> See screenshot 1 & 2
> There is a sudden dip in the processing time from ~15 seconds (usual for this 
> app) to ~3 minutes & from then on the processing time keeps degrading 
> throughout.
> We have seen that the delay is due to certain tasks taking the exact time 
> duration of the configured Kafka Consumer 'request.timeout.ms' . We have 
> tested this by varying timeout property to different values.
> See screenshot 3.
> I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method  & 
> subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually 
> timing out on some of the partitions without reading data. But the executor 
> logs it as successfully completed after the exact timeout duration. Note that 
> most other tasks are completing successfully with millisecond duration. The 
> timeout is most likely from the 
> org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any 
> network latency difference.
> We have observed this across multiple clusters & multiple apps with & without 
> TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent 
> performance
> 17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 446288
> 17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0 
> (TID 446288)
> 17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX, 
> partition 0 offsets 776843 -> 779591
> 17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for 
> spark-executor-default1 XX-XXX-XX 0 776843
> 17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0 
> (TID 446288). 1699 bytes result sent to driver
> 17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 446329
> 17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0 
> (TID 446329)
> 17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116 
> and clearing cache
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast 
> variable 6807
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored 
> as bytes in memory (estimated size 13.1 KB, free 4.1 GB)
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable 
> 6807 took 4 ms
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as 
> values in m
> We can see that the log statement differ with the exact timeout duration.
> Our consumer config is below.
> 17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@1171dde4
> 17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values:
>       metric.reporters = []
>       metadata.max.age.ms = 300000
>       partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>       reconnect.backoff.ms = 50
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       max.partition.fetch.bytes = 1048576
>       bootstrap.servers = [xxxxx.xxx.xxx:9092]
>       ssl.keystore.type = JKS
>       enable.auto.commit = true
>       sasl.mechanism = GSSAPI
>       interceptor.classes = null
>       exclude.internal.topics = true
>       ssl.truststore.password = null
>       client.id =
>       ssl.endpoint.identification.algorithm = null
>       max.poll.records = 2147483647
>       check.crcs = true
>       request.timeout.ms = 50000
>       heartbeat.interval.ms = 3000
>       auto.commit.interval.ms = 5000
>       receive.buffer.bytes = 65536
>       ssl.truststore.type = JKS
>       ssl.truststore.location = null
>       ssl.keystore.password = null
>       fetch.min.bytes = 1
>       send.buffer.bytes = 131072
>       value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>       group.id = default1
>       retry.backoff.ms = 100
>       ssl.secure.random.implementation = null
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       ssl.trustmanager.algorithm = PKIX
>       ssl.key.password = null
>       fetch.max.wait.ms = 500
>       sasl.kerberos.min.time.before.relogin = 60000
>       connections.max.idle.ms = 540000
>       session.timeout.ms = 30000
>       metrics.num.samples = 2
>       key.deserializer = class 
> org.apache.kafka.common.serialization.StringDeserializer
>       ssl.protocol = TLS
>       ssl.provider = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       ssl.keystore.location = null
>       ssl.cipher.suites = null
>       security.protocol = PLAINTEXT
>       ssl.keymanager.algorithm = SunX509
>       metrics.sample.window.ms = 30000
>       auto.offset.reset = latest



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to