This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 36c24a0 [SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options 36c24a0 is described below commit 36c24a03bd6d00beb65377d37855544f406d1f18 Author: Yuanjian Li <yuanjian...@databricks.com> AuthorDate: Fri Aug 20 10:41:42 2021 +0900 [SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options ### What changes were proposed in this pull request? Add more documents and checking logic for the new options `minOffsetPerTrigger` and `maxTriggerDelay`. ### Why are the changes needed? Have a clear description of the behavior introduced in SPARK-35312 ### Does this PR introduce _any_ user-facing change? Yes. If the user set minOffsetsPerTrigger > maxOffsetsPerTrigger, the new code will throw an AnalysisException. The original behavior is to ignore the maxOffsetsPerTrigger silenctly. ### How was this patch tested? Existing tests. Closes #33792 from xuanyuanking/SPARK-35312-follow. Authored-by: Yuanjian Li <yuanjian...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit a0b24019edcd268968a7e0074b0a54988e408699) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/structured-streaming-kafka-integration.md | 15 +++++++++------ .../apache/spark/sql/kafka010/KafkaSourceProvider.scala | 13 +++++++++++++ .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 4 ++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 6926bbb..0ec359f 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -477,23 +477,26 @@ The following configurations are optional: <td>maxOffsetsPerTrigger</td> <td>long</td> <td>none</td> - <td>streaming and batch</td> + <td>streaming query</td> <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td> </tr> <tr> <td>minOffsetsPerTrigger</td> <td>long</td> <td>none</td> - <td>streaming and batch</td> - <td>Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will - be proportionally split across topicPartitions of different volume.</td> + <td>streaming query</td> + <td>Minimum number of offsets to be processed per trigger interval. The specified total number of + offsets will be proportionally split across topicPartitions of different volume. Note, if the + maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets + doesn't reach minOffsetsPerTrigger.</td> </tr> <tr> <td>maxTriggerDelay</td> <td>time with units</td> <td>15m</td> - <td>streaming and batch</td> - <td>Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source.</td> + <td>streaming query</td> + <td>Maximum amount of time for which trigger can be delayed between two triggers provided some + data is available from the source. This option is only applicable if minOffsetsPerTrigger is set.</td> </tr> <tr> <td>minPartitions</td> diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 38803b7..4a75ab0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -322,6 +322,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister s"Option 'kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}' must be specified for " + s"configuring Kafka consumer") } + + if (params.contains(MIN_OFFSET_PER_TRIGGER) && params.contains(MAX_OFFSET_PER_TRIGGER)) { + val minOffsets = params.get(MIN_OFFSET_PER_TRIGGER).get.toLong + val maxOffsets = params.get(MAX_OFFSET_PER_TRIGGER).get.toLong + if (minOffsets > maxOffsets) { + throw new IllegalArgumentException(s"The value of minOffsetPerTrigger($minOffsets) is " + + s"higher than the maxOffsetsPerTrigger($maxOffsets).") + } + } } private def validateStreamOptions(params: CaseInsensitiveMap[String]) = { @@ -382,6 +391,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister if (params.contains(MIN_OFFSET_PER_TRIGGER)) { logWarning("minOffsetsPerTrigger option ignored in batch queries") } + + if (params.contains(MAX_TRIGGER_DELAY)) { + logWarning("maxTriggerDelay option ignored in batch queries") + } } class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index d9fad5e..f61696f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1908,6 +1908,10 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { testBadOptions("assign" -> "")("no topicpartitions to assign") testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") + testBadOptions( + "kafka.bootstrap.servers" -> "fake", "subscribe" -> "t", "minOffsetsPerTrigger" -> "20", + "maxOffsetsPerTrigger" -> "15")( + "value of minOffsetPerTrigger(20) is higher than the maxOffsetsPerTrigger(15)") } test("unsupported kafka configs") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org