[ https://issues.apache.org/jira/browse/SPARK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16964249#comment-16964249 ]
Sandish Kumar HN commented on SPARK-29625: ------------------------------------------ [~hyukjin.kwon] it is happening randomly, so there is no way to reproduce the exact error again. the basic question is why is spark is trying to reset the offset of same partition twice? hope you understand the problem. > Spark Structure Streaming Kafka Wrong Reset Offset twice > -------------------------------------------------------- > > Key: SPARK-29625 > URL: https://issues.apache.org/jira/browse/SPARK-29625 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.1 > Reporter: Sandish Kumar HN > Priority: Major > > Spark Structure Streaming Kafka Reset Offset twice, once with right offsets > and second time with very old offsets > {code} > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-151 to offset 0. > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-118 to offset 0. > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-85 to offset 0. > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-52 to offset 122677634. > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-19 to offset 0. > [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO Fetcher: [Consumer clientId=consumer-1, > groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] > Resetting offset for partition topic-52 to offset 120504922.* > [2019-10-28 19:27:40,153] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > INFO ContextCleaner: Cleaned accumulator 810 > {code} > which is causing a Data loss issue. > {code} > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 > ERROR StreamExecution: Query [id = d62ca9e4-6650-454f-8691-a3d576d1e4ba, > runId = 3946389f-222b-495c-9ab2-832c0422cbbb] terminated with error > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - > java.lang.IllegalStateException: Partition topic-52's offset was changed from > 122677598 to 120504922, some data may have been missed. > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - Some data may have > been lost because they are not available in Kafka any more; either the > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - data was aged out > by Kafka or the topic may have been deleted before all the data in the > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - topic was > processed. If you don't want your streaming query to fail on such cases, set > the > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - source option > "failOnDataLoss" to "false". > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.AbstractTraversable.filter(Traversable.scala:104) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:610) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org