[jira] [Commented] (SPARK-26267) Kafka source may reprocess data

2018-12-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726973#comment-16726973
 ] 

ASF GitHub Bot commented on SPARK-26267:


zsxwing opened a new pull request #23365: [SPARK-26267][SS] Retry when 
detecting incorrect offsets from Kafka (2.4)
URL: https://github.com/apache/spark/pull/23365
 
 
   ## What changes were proposed in this pull request?
   
   Backport #23324 to branch-2.4.
   
   ## How was this patch tested?
   
   Jenkins


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26267) Kafka source may reprocess data

2018-12-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726970#comment-16726970
 ] 

ASF GitHub Bot commented on SPARK-26267:


zsxwing closed pull request #23324: [SPARK-26267][SS]Retry when detecting 
incorrect offsets from Kafka
URL: https://github.com/apache/spark/pull/23324
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 1753a28fba2fb..02dfb9ca2b95a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
   override def initialOffset(): Offset = {
 val offsets = initialOffsets match {
   case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
   case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, 
reportDataLoss)
 }
 logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
 
   override def needsReconfiguration(config: ScanConfig): Boolean = {
 val knownPartitions = 
config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-offsetReader.fetchLatestOffsets().keySet != knownPartitions
+offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index bb4de674c3c72..b4f042e93a5da 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 
   override def latestOffset(start: Offset): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
 endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
   rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
 }.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 }.toSeq
 logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+val untilOffsets = endPartitionOffsets
+untilOffsets.foreach { case (tp, untilOffset) =>
+  fromOffsets.get(tp).foreach { fromOffset =>
+if (untilOffset < fromOffset) {
+  reportDataLoss(s"Partition $tp's offset was changed from " +
+s"$fromOffset to $untilOffset, some data may have been missed")
+}
+  }
+}
+
 // Calculate offset ranges
 val offsetRanges = rangeCalculator.getRanges(
-  fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-  untilOffsets = endPartitionOffsets,
+  fromOffsets = fromOffsets,
+  untilOffsets = untilOffsets,
   executorLocations = getSortedExecutorList())
 
 // Reuse Kafka consumers only when all the offset ranges have distinct 
TopicPartitions,
@@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 case EarliestOffsetRangeLimit =>
   KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
 case LatestOffsetRangeLimit =>
-  KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+  KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
 case SpecificOffsetRangeLimit(p) =>
   kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
   }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c724afba..6008794924052 100644
--- 

[jira] [Commented] (SPARK-26267) Kafka source may reprocess data

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721869#comment-16721869
 ] 

ASF GitHub Bot commented on SPARK-26267:


zsxwing opened a new pull request #23324: [SPARK-26267][SS]Retry when detecting 
incorrect offsets from Kafka
URL: https://github.com/apache/spark/pull/23324
 
 
   ## What changes were proposed in this pull request?
   
   Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka 
may return an earliest offset when we are request a latest offset. This will 
cause Spark to reprocess data.
   
   To reduce the impact of KAFKA-7703, this PR will use the previous offsets we 
get to audit the result from Kafka. If we find any incorrect offset, we will 
retry at most `maxOffsetFetchAttempts` times. For the first batch of a new 
query, as we don't have any previous offsets, we simply fetch offsets twice. 
This should reduce the chance to hit KAFKA-7703 a lot.
   
   ## How was this patch tested?
   
   Jenkins


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26267) Kafka source may reprocess data

2018-12-04 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709169#comment-16709169
 ] 

Shixiong Zhu commented on SPARK-26267:
--

KAFKA-7703 only exists in Kafka 1.1.0 and above, so a possible workaround is 
using an old version that doesn't have this issue. This doesn't impact Spark 
2.3.x and below as we use Kafka 0.10.0.1 by default.

> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org