It seems, I found the issue. The actual problem is something related to back pressure. When I am adding these config *spark.streaming.kafka.maxRatePerPartition* or *spark.streaming.backpressure.initialRate* (the of these configs are 100). After that it starts consuming one message per partition per batch. Not why it's happening.
On Thu, Apr 2, 2020 at 8:48 AM Waleed Fateem <waleed.fat...@gmail.com> wrote: > Well this is interesting. Not sure if this is the expected behavior. The > log messages you have referenced are actually printed out by the Kafka > Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher). > > That log message belongs to a new feature added starting with Kafka 1.1: > https://issues.apache.org/jira/browse/KAFKA-6397 > > I'm assuming then that you're using Spark 2.4? > > From Kafka's perspective, when you do a describe on your > demandIngestion.SLTarget topic, does that look okay? All partitions are > available with a valid leader. > > The other thing I'm curious about, after you > enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going > back to the older group.id and do you see the same behavior? Was there a > reason you chose to start reading again from the beginning by using a new > consumer group rather then sticking to the same consumer group? > > In your application, are you manually committing offsets to Kafka? > > Regards, > > Waleed > > On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra <sd.hri...@gmail.com> > wrote: > >> Hi >> >> Our Spark streaming job was working fine as expected (the number of >> events to process in a batch). But due to some reasons, we added compaction >> on Kafka topic and restarted the job. But after restart it was failing for >> below reason: >> >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in >> stage 2.0 (TID 231, 10.34.29.38, executor 4): >> java.lang.IllegalArgumentException: requirement failed: Got wrong record >> for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 >> even after seeking to offset 106847 got offset 199066 instead. If this is a >> compacted topic, consider enabling >> spark.streaming.kafka.allowNonConsecutiveOffsets >> at scala.Predef$.require(Predef.scala:224) >> at >> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146) >> >> >> >> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true in >> spark config and I changed the group name to consume from beginning. Now >> the problem is, it reading only one message from per partition. So if a >> topic has 50 partitions then its reading 50 message per batch (batch >> duration is 5 sec). >> >> The topic is 1M records and consumer has huge lag. >> >> >> Driver log which fetches 1 message per partition. >> >> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951. >> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952. >> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953. >> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954. >> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955. >> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956. >> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957. >> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958. >> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959. >> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960. >> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961. >> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962. >> 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963. >> 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964. >> 20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965. >> 20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966. >> 20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967. >> 20/03/31 18:27:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211968. >> 20/03/31 18:27:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211969. >> 20/03/31 18:27:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] >> Resetting offset for partition demandIngestion.SLTarget-45 to offset >> 211970. >> >> >> >> Spark Config (batch.duration: 5, using Spark Stream) : >> >> spark.shuffle.service.enabled: "true" >> >> spark.streaming.backpressure.enabled: "true" >> >> spark.streaming.concurrentJobs: "1" >> >> spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC" >> >> spark.streaming.backpressure.pid.minRate: 1500 >> >> spark.streaming.backpressure.initialRate: 100 >> >> spark.streaming.kafka.allowNonConsecutiveOffsets: true >> >> >> >> Is there any issue in my configuration or something special required with >> compact Kafka topic which I'm missing? >> >> >> >> >> Regards >> Hrishi >> >>