[ https://issues.apache.org/jira/browse/SPARK-20780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jayadeepj updated SPARK-20780: ------------------------------ Attachment: streaming_1.png streaming_2.png tasks_timing_out_3.png > Spark Kafka10 Consumer Hangs > ---------------------------- > > Key: SPARK-20780 > URL: https://issues.apache.org/jira/browse/SPARK-20780 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.0 > Environment: Spark 2.1.0 > Spark Streaming Kafka 010 > CDH 5.8.4 > CentOS Linux release 7.2 > Reporter: jayadeepj > Priority: Critical > Attachments: streaming_1.png, streaming_2.png, tasks_timing_out_3.png > > > We have recently upgraded our Streaming App with Direct Stream to Spark 2 > (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) & Consumer > 10 . We find abnormal delays after the application has run for a couple of > hours or completed consumption of approx. ~ 5 million records. > See screenshot 1 & 2 > There is a sudden dip in the processing time from ~15 seconds (usual for this > app) to ~3 minutes & from then on the processing time keeps degrading > throughout. > We have seen that the delay is due to certain tasks taking the exact time > duration of the configured Kafka Consumer 'request.timeout.ms' . We have > tested this by varying timeout property to different values. > See screenshot 3. > I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method & > subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually > timing out on some of the partitions without reading data. But the executor > logs it as successfully completed after the exact timeout duration. Note that > most other tasks are completing successfully with millisecond duration. The > timeout is most likely from the > org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any > network latency difference. > We have observed this across multiple clusters & multiple apps with & without > TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent > performance > 17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned > task 446288 > 17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0 > (TID 446288) > 17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX, > partition 0 offsets 776843 -> 779591 > 17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for > spark-executor-default1 XX-XXX-XX 0 776843 > 17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0 > (TID 446288). 1699 bytes result sent to driver > 17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned > task 446329 > 17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0 > (TID 446329) > 17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116 > and clearing cache > 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast > variable 6807 > 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored > as bytes in memory (estimated size 13.1 KB, free 4.1 GB) > 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable > 6807 took 4 ms > 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as > values in m > We can see that the log statement differ with the exact timeout duration. > Our consumer config is below. > 17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@1171dde4 > 17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [xxxxx.xxx.xxx:9092] > ssl.keystore.type = JKS > enable.auto.commit = true > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 > check.crcs = true > request.timeout.ms = 50000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 65536 > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = default1 > retry.backoff.ms = 100 > ssl.secure.random.implementation = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > auto.offset.reset = latest -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org