This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ae0592d  [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS 
kafka rateLimit calculation
ae0592d is described below

commit ae0592ddf7009934e9a5ee05a06a1cf80e354393
Author: ryne.yang <ryne.y...@acuityads.com>
AuthorDate: Tue Jan 29 12:40:28 2019 -0800

    [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit 
calculation
    
    ## What changes were proposed in this pull request?
    
    Fix the integer overflow issue in rateLimit.
    
    ## How was this patch tested?
    
    Pass the Jenkins with newly added UT for the possible case where integer 
could be overflowed.
    
    Closes #23652 from linehrr/fix/integer_overflow_rateLimit.
    
    Authored-by: ryne.yang <ryne.y...@acuityads.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 10 ++++++-
 .../apache/spark/sql/kafka010/KafkaSource.scala    | 10 ++++++-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 35 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index b6c8035..1333bc2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -239,7 +239,15 @@ private[kafka010] class KafkaMicroBatchReader(
             val begin = from.get(tp).getOrElse(fromNew(tp))
             val prorate = limit * (size / total)
             // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+            val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+            // need to be careful of integer overflow
+            // therefore added canary checks where to see if off variable 
could be overflowed
+            // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+            val off = if (prorateLong > Long.MaxValue - begin) {
+              Long.MaxValue
+            } else {
+              begin + prorateLong
+            }
             // Paranoia, make sure not to return an offset that's past end
             Math.min(end, off)
           }.getOrElse(end)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index d65b3ce..464ad64 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -190,7 +190,15 @@ private[kafka010] class KafkaSource(
             val prorate = limit * (size / total)
             logDebug(s"rateLimit $tp prorated amount is $prorate")
             // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+            val prorateLong = (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+            // need to be careful of integer overflow
+            // therefore added canary checks where to see if off variable 
could be overflowed
+            // refer to [https://issues.apache.org/jira/browse/SPARK-26718]
+            val off = if (prorateLong > Long.MaxValue - begin) {
+              Long.MaxValue
+            } else {
+              begin + prorateLong
+            }
             logDebug(s"rateLimit $tp new offset is $off")
             // Paranoia, make sure not to return an offset that's past end
             Math.min(end, off)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5f05833..34cf335 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -199,6 +199,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       StopStream)
   }
 
+  test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " +
+    "during end offset calculation") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    // fill in 5 messages to trigger potential integer overflow
+    testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0))
+
+    val partitionOffsets = Map(
+      new TopicPartition(topic, 0) -> 5L
+    )
+    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      // use latest to force begin to be 5
+      .option("startingOffsets", startingOffsets)
+      // use Long.Max to try to trigger overflow
+      .option("maxOffsetsPerTrigger", Long.MaxValue)
+      .option("subscribe", topic)
+      .option("kafka.metadata.max.age.ms", "1")
+      .load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 30, 31, 32, 33, 34),
+      CheckAnswer(30, 31, 32, 33, 34),
+      StopStream
+    )
+  }
+
   test("maxOffsetsPerTrigger") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to