This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new ae0592d [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation ae0592d is described below commit ae0592ddf7009934e9a5ee05a06a1cf80e354393 Author: ryne.yang <ryne.y...@acuityads.com> AuthorDate: Tue Jan 29 12:40:28 2019 -0800 [SPARK-26718][SS][BRANCH-2.4] Fixed integer overflow in SS kafka rateLimit calculation ## What changes were proposed in this pull request? Fix the integer overflow issue in rateLimit. ## How was this patch tested? Pass the Jenkins with newly added UT for the possible case where integer could be overflowed. Closes #23652 from linehrr/fix/integer_overflow_rateLimit. Authored-by: ryne.yang <ryne.y...@acuityads.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 10 ++++++- .../apache/spark/sql/kafka010/KafkaSource.scala | 10 ++++++- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 35 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index b6c8035..1333bc2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -239,7 +239,15 @@ private[kafka010] class KafkaMicroBatchReader( val begin = from.get(tp).getOrElse(fromNew(tp)) val prorate = limit * (size / total) // Don't completely starve small topicpartitions - val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + // need to be careful of integer overflow + // therefore added canary checks where to see if off variable could be overflowed + // refer to [https://issues.apache.org/jira/browse/SPARK-26718] + val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue + } else { + begin + prorateLong + } // Paranoia, make sure not to return an offset that's past end Math.min(end, off) }.getOrElse(end) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index d65b3ce..464ad64 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -190,7 +190,15 @@ private[kafka010] class KafkaSource( val prorate = limit * (size / total) logDebug(s"rateLimit $tp prorated amount is $prorate") // Don't completely starve small topicpartitions - val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + val prorateLong = (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + // need to be careful of integer overflow + // therefore added canary checks where to see if off variable could be overflowed + // refer to [https://issues.apache.org/jira/browse/SPARK-26718] + val off = if (prorateLong > Long.MaxValue - begin) { + Long.MaxValue + } else { + begin + prorateLong + } logDebug(s"rateLimit $tp new offset is $off") // Paranoia, make sure not to return an offset that's past end Math.min(end, off) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5f05833..34cf335 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -199,6 +199,41 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { StopStream) } + test("SPARK-26718 Rate limit set to Long.Max should not overflow integer " + + "during end offset calculation") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + // fill in 5 messages to trigger potential integer overflow + testUtils.sendMessages(topic, (0 until 5).map(_.toString).toArray, Some(0)) + + val partitionOffsets = Map( + new TopicPartition(topic, 0) -> 5L + ) + val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets) + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + // use latest to force begin to be 5 + .option("startingOffsets", startingOffsets) + // use Long.Max to try to trigger overflow + .option("maxOffsetsPerTrigger", Long.MaxValue) + .option("subscribe", topic) + .option("kafka.metadata.max.age.ms", "1") + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 30, 31, 32, 33, 34), + CheckAnswer(30, 31, 32, 33, 34), + StopStream + ) + } + test("maxOffsetsPerTrigger") { val topic = newTopic() testUtils.createTopic(topic, partitions = 3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org