This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new ad18faa [SPARK-26718][SS][BRANCH-2.3] Fixed integer overflow in SS kafka rateLimit calculation ad18faa is described below commit ad18faa65251aa35437adee49a1767d9b4977d48 Author: ryne.yang <ryne.y...@acuityads.com> AuthorDate: Wed Jan 30 11:13:19 2019 -0800 [SPARK-26718][SS][BRANCH-2.3] Fixed integer overflow in SS kafka rateLimit calculation ## What changes were proposed in this pull request? Fix the integer overflow issue in rateLimit. ## How was this patch tested? Pass the Jenkins with newly added UT for the possible case where integer could be overflowed. Closes #23703 from linehrr/fix/2.3-integeroverflow. Authored-by: ryne.yang <ryne.y...@acuityads.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/kafka010/KafkaSource.scala | 10 ++++++- .../spark/sql/kafka010/KafkaSourceSuite.scala | 35 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 169a5d0..f88c4d5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -188,7 +188,15 @@ private[kafka010] class KafkaSource( val prorate = limit * (size / total) logDebug(s"rateLimit $tp prorated amount is $prorate") // Don't completely starve small topicpartitions - val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + // need to be careful of integer overflow + // therefore added canary checks where to see if off variable could be overflowed + // refer to [https://issues.apache.org/jira/browse/SPARK-26718] + val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue + } else { + begin + prorateLong + } logDebug(s"rateLimit $tp new offset is $off") // Paranoia, make sure not to return an offset that's past end Math.min(end, off) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index 02c8764..3a27d41 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -176,6 +176,41 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { StopStream) } + test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " + + "during end offset calculation") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + // fill in 5 messages to trigger potential integer overflow + testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0)) + + val partitionOffsets = Map( + new TopicPartition(topic, 0) -> 5L + ) + val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets) + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // use latest to force begin to be 5 + .option("startingOffsets", startingOffsets) + // use Long.Max to try to trigger overflow + .option("maxOffsetsPerTrigger", Long.MaxValue) + .option("subscribe", topic) + .option("kafka.metadata.max.age.ms", "1") + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 30, 31, 32, 33, 34), + CheckAnswer(30, 31, 32, 33, 34), + StopStream + ) + } + test("maxOffsetsPerTrigger") { val topic = newTopic() testUtils.createTopic(topic, partitions = 3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org