[GitHub] spark issue #23060: [SPARK-26092][SS]Use CheckpointFileManager to write the ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/23060 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223479786 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat + +## Recovery Semantics after Changes in a Streaming Query +There are limitations on what changes in a streaming query are allowed between restarts from the +same checkpoint location. Here are a few kinds of changes that are either not allowed, or +the effect of the change is not well-defined. For all of them: + +- The term *allowed* means you can do the specified change but whether the semantics of its effect + is well-defined depends on the query and the change. + +- The term *not allowed* means you should not do the specified change as the restarted query is likely + to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset + generated with sparkSession.readStream. + +**Types of changes** + +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed. + +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics + of the change are well-defined depends on the source and the query. Here are a few examples. + + - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)` + + - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")` + +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks + are allowed. This needs to be verified on a case-by-case basis. Here are a few examples. + + - File sink to Kafka sink is allowed. Kafka will see only the new data. + + - Kafka sink to file sink is not allowed. + + - Kafka sink changed to foreach, or vice versa is allowed. + +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of + the change are well-defined depends on the sink and the query. Here are a few examples. + + - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")` + + - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")` + + - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code. + +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: + + - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. + + - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`. + + - Changes in projections with different output schema are conditionally allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` is allowed only if the output sink allows the schema change from `"a"` to `"b"`. + +- *Changes in stateful operations*: Some operations in streaming queries need to maintain + state data in order to continuously update the result. Structured Streaming automatically checkpoints + the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart. --- End diff -- replaced with HDFS --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223479414 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22627 @holdenk yeah, i intend to backport this to 2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223456294 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat + +## Recovery Semantics after Changes in a Streaming Query +There are limitations on what changes in a streaming query are allowed between restarts from the +same checkpoint location. Here are a few kinds of changes that are either not allowed, or +the effect of the change is not well-defined. For all of them: + +- The term *allowed* means you can do the specified change but whether the semantics of its effect + is well-defined depends on the query and the change. + +- The term *not allowed* means you should not do the specified change as the restarted query is likely + to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset + generated with sparkSession.readStream. + +**Types of changes** + +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed. + +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics + of the change are well-defined depends on the source and the query. Here are a few examples. + + - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)` + + - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")` + +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks + are allowed. This needs to be verified on a case-by-case basis. Here are a few examples. + + - File sink to Kafka sink is allowed. Kafka will see only the new data. + + - Kafka sink to file sink is not allowed. + + - Kafka sink changed to foreach, or vice versa is allowed. + +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of + the change are well-defined depends on the sink and the query. Here are a few examples. + + - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")` + + - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")` + + - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code. + +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: + + - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. + + - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`. --- End diff -- Right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223456079 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. --- End diff -- Yes. I missed a few, and I want to fix them all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22627 @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/22627 [SPARK-25639] [DOCS] Added docs for foreachBatch, python foreach and multiple watermarks ## What changes were proposed in this pull request? Added - Python foreach - Scala, Java and Python foreachBatch - Multiple watermark policy - The semantics of what changes are allowed to the streaming between restarts. ## How was this patch tested? No tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-25639 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22627.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22627 commit f61c13ef0d4711a04b2774934641f7a4ac690165 Author: Tathagata Das Date: 2018-10-04T10:33:47Z Added docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22507: [SPARK-25495][SS]FetchedData.reset should reset all fiel...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22507 LGTM. Thanks for the explanation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220061053 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- Make it clear that you " want to send more message *before* the tasks of the current batch start reading the current batch data, so that the executors also, I am not entirely sure how it causes `fetchedData.reset()` thus creating the issue. Are you sure this fails without your fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220059919 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- what do you mean by drop them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22476: [SPARK-24157][SS][FOLLOWUP] Rename to spark.sql.streamin...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22476 LGTM. Please make sure to merge it to 2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22386 LGTM. Just one super nit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22386#discussion_r216781599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala --- @@ -74,9 +75,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( // If we're in continuous processing mode, we should get the store version for the current // epoch rather than the one at planning time. -val currentVersion = EpochTracker.getCurrentEpoch match { - case None => storeVersion - case Some(value) => value +val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING)) + .map(_.toBoolean) +val currentVersion = if (isContinuous.contains(true)) { --- End diff -- super nit: this looks weird. rather i would do change the previous line `val isContinuous = ... .map(_.toBoolean).getOrElse(false)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22293 I was afraid that this would be flaky. Glad you found a solution quickly. just one comment to improve code readability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22293#discussion_r214208589 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -652,62 +654,67 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } +val topicPartition = new TopicPartition(topic, 0) // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), -WithOffsetSync(topic) { () => +WithOffsetSync(topicPartition) { () => --- End diff -- Its weird to have a hanging "5" in the thunk. Rather take the expected offset as part of the with. That is, `WithOffsetSync(topicPartition, expectedOffset = 5) {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22042 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22230 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212706003 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) +
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212706908 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) +
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212706859 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) +
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212526645 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) +
[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212526373 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { +// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic +new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { +val props = super.brokerConfiguration +// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code +// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at +// least 30 seconds. +props.put("log.cleaner.backoff.ms", "100") +// The size of RecordBatch V2 increases to support transactional write. +props.put("log.segment.bytes", "70") +props.put("log.retention.bytes", "40") +props.put("log.retention.check.interval.ms", "100") +props.put("delete.retention.ms", "10") +props.put("log.flush.scheduler.interval.ms", "10") +props + } +} +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) +testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) +
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212507190 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { +def apply( +topic: String, +producer: KafkaProducer[String, String])( --- End diff -- Ping on this comment. Maybe you missed this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212522664 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { --- End diff -- nit: This is not creating a KafkaProducer .. as most `With***` methods. The point of this is to force synchronization of the consumer. So maybe rename it to `WithOffsetSync { ... }`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212521083 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the fetched record for the given offset if available. + * + * If the record is invisible (either a transaction message, or an aborted message when the + * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the + * next offset to fetch. + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this + * method will return `null` if the next available record is within [offset, untilOffset). * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ - private def fetchData( + private def fetchRecord( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) -} - -if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { -throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) +} else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. + if (offset < fetchedData.offsetAfterPoll) { +// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask +// the next call to start from `fetchedData.offsetAfterPoll`. +fetchedData.reset() +return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { -throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") +// Fetch records from Kafka and update `fetchedData`. +fetchData(offset, pollTimeoutMs) } +} + +if (!fetchedData.hasNext) { + // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still + // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a + // record to ask the next call to start from `fetchedData.offsetAfterPoll`. + assert(offset <= fetchedData.offsetAfterPoll, +s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}") + fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { val record = fetchedData.next() - nextOffsetInFetchedData = record.offset + 1 // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { +val range = getAvailableOffsetRange() +if (range.earliest <= offset) { + // `offset` is still valid but the corresponding message is invisible. We should skip it + // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of + // `fetchRecord` can just return `record` directly. + fetchedData.previous() + return fet
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212522432 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { +def apply( +topic: String, +producer: KafkaProducer[String, String])( +func: KafkaProducer[String, String] => Unit): AssertOnQuery = { --- End diff -- nit: AssertOnQuery -> StreamAction --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r212504622 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala --- @@ -331,6 +331,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) private val rangeToRead = resolveRange(offsetRange) + --- End diff -- unnecessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22191: [SPARK-25204][SS] Fix race in rate source test.
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22191 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22182: [SPARK-25184][SS] Fixed race condition in StreamE...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/22182 [SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState ## What changes were proposed in this pull request? The race condition that caused test failure is between 2 threads. - The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events. - The test thread that generates some input data, checked the answer and then verified the query generated progress event. The synchronization structure between these threads is as follows 1. MicrobatchExecution thread, in every batch, does the following in order. a. Processes batch input to generate answer. b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset` c. Generates progress event 2. Test execution thread a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`. b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer. c. Finally, it would verify the last generated progress event. What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c. In other words, the progress event may be generated after the test tries to verify it. The solution has two steps. 1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`. 2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value. This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur. - MicroBatchExecution thread updates committed offsets - Test thread waiting on `awaitProgressLockCondition` accidentally times out after 100 ms, finds that the committed offsets have been updated, therefore returns from `awaitOffset` and moves on to the progress event tests. - MicroBatchExecution thread then generates progress event and signals. But the test thread has already attempted to verify the event and failed. By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 seconds`, similar to `awaitInitialization`), this above type of race condition is also avoided. ## How was this patch tested? Ran locally many times. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-25184 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22182 commit 319990ff60ad7b6fad6fd0cea5cada0b22e3f3c9 Author: Tathagata Das Date: 2018-08-22T04:44:59Z [SC-12136][SS][HOTFIX] Fixed race condition in StreamExecution that caused flaky test in FlatMapGroupsWithState The race condition that caused test failure is between 2 threads. - The MicrobatchExecution thread that processes inputs to produce answers and then generates progress events. - The test thread that generates some input data, checked the answer and then verified the query generated progress event. The synchronization structure between these threads is as follows 1. MicrobatchExecution thread, in every batch, does the following in order. a. Processes batch input to generate answer. b. Signals `awaitProgressLockCondition` to wake up threads waiting for progress using `awaitOffset` c. Generates progress event 2. Test execution thread a. Calls `awaitOffset` to wait for progress, which waits on `awaitProgressLockCondition`. b. As soon as `awaitProgressLockCondition` is signaled, it would move on the in the test to check answer. c. Finally, it would verify the last generated progress event. What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b -> 2c -> 1c. In other words, the progress event may be generated after the test tries to verify it. The solution has two steps. 1. Signal the waiting thread after the progress event has been generated, that is, after `finishTrigger()`. 2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a large value. This latter is to ensure that test thread for keeps waiting on `awaitProgressLockCondition`until the MicroBatchExecution thread explicitly signals it. With the existing small timeout of 100ms the following sequence can occur. - MicroBatchExecution thread updates committed offsets - Test thread waiting
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211805733 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211801632 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -346,11 +437,40 @@ private[kafka010] case class InternalKafkaConsumer( consumer.seek(topicPartition, offset) } - private def poll(pollTimeoutMs: Long): Unit = { + /** + * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`. + * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible + * messages (either transaction messages, or aborted messages when `isolation.level` is + * `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def poll(offset: Long, pollTimeoutMs: Long): Unit = { --- End diff -- Maybe rename this method to be consistent with that it does fetch data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211802112 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) -} - -if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { -throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + poll(offset, pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < fetchedData.offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. --- End diff -- "skip them" is confusing. What does it mean to skip? Why are we still returning something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211801676 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) -} - -if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { -throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + poll(offset, pollTimeoutMs) --- End diff -- comment that this method updates `fetchedData` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211801968 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) -} - -if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { -throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + poll(offset, pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. --- End diff -- nit: I was confused with whether the above comment was for the `else if` above it or for the `if` below it. Maybe inline it with the `else if`. Or leave a line after it, before the `if` below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211795985 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -80,6 +90,72 @@ private[kafka010] case class InternalKafkaConsumer( kafkaParams: ju.Map[String, Object]) extends Logging { import InternalKafkaConsumer._ + /** + * The internal object to store the fetched data from Kafka consumer and the next offset to poll. + * + * @param records the pre-fetched Kafka records. + * @param nextOffsetInFetchedData the next offset in `records`. We use this to verify if we should + *check if the pre-fetched data is still valid. + * @param offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to poll + *when `records` is drained. + */ + private case class FetchedData( + private var records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + var nextOffsetInFetchedData: Long, --- End diff -- Make this public getter, private setter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211805275 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 +AdvanceManualClock(100), +wait
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211801549 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, --- End diff -- Maybe rename this method to fetchRecord, to make it consistent with return type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211804454 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => --- End diff -- use `Execute` and comment on what this does and why we need it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211802489 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -288,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer( null --- End diff -- We should not be returning null EVER when we are using `FetchedRecord.record = null` to signify lack of record. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211801254 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. + failOnDataLoss: Boolean): FetchedRecord = { +if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) -} - -if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { -throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + poll(offset, pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < fetchedData.offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +fetchedData.reset() +return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { -throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") +poll(offset, pollTimeoutMs) } +} + +if (!fetchedData.hasNext) { + assert(offset <= fetchedData.offsetAfterPoll, --- End diff -- Add comments here on what this case means. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211804879 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, --- End diff -- Why is this `waitUntilBatchProcessed` needed? CheckAnswer waits for the batch to complete anyways. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211803267 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -337,6 +338,7 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { nextRow = converter.toUnsafeRow(record) +nextOffset = record.offset + 1 --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211805821 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211805409 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { +// This test will cover the following cases: +// 1. the whole batch contains no data messages +// 2. the first offset in a batch is not a committed data message +// 3. the last offset in a batch is not a committed data message +// 4. there is a gap in the middle of a batch + +val topic = newTopic() +testUtils.createTopic(topic, partitions = 1) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock + +val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { +if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) +} + } + if (q.exception.isDefined) { +throw q.exception.get + } + true +} + +val producer = testUtils.createProducer(usingTrascation = true) +try { + producer.initTransactions() + + testStream(mapped)( +StartStream(ProcessingTime(100), clock), +waitUntilBatchProcessed, +// 1 from smallest, 1 from middle, 8 from biggest +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +// Should not see any uncommitted messages +CheckAnswer(), +WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 3: _*), // offset 0, 1, 2 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a committed data message] +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8* +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11* +WithKafkaProducer(topic, producer) { producer => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (11 to 15).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +}, +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14 +AdvanceManualClock(100), +waitUntilBatchProcessed, +CheckAnswer((1 to 5) ++ (
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211804704 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { +def apply( +topic: String, +producer: KafkaProducer[String, String])( --- End diff -- Why pass producer when all you are doing is to pass it to the function. The function can do it on its own. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211803763 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithKafkaProducer { +def apply( +topic: String, +producer: KafkaProducer[String, String])( +func: KafkaProducer[String, String] => Unit): AssertOnQuery = { + AssertOnQuery(_ => { --- End diff -- nit: use Execute --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211805993 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -327,6 +332,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } + def createProducer(usingTrascation: Boolean): KafkaProducer[String, String] = { --- End diff -- nit: usingTrascation -> usingTra**n**scation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 I did. Fixed the import --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22175: [MINOR] Added import to fix compilation
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22175 Merged as compilation passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22175: [MINOR] Added import to fix compilation
Github user tdas commented on the issue: https://github.com/apache/spark/pull/22175 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22175: [MINOR] Added import to fix compilation
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/22175 [MINOR] Added import to fix compilation ## What changes were proposed in this pull request? Two back to PRs implicitly conflicted by one PR removing an existing import that the other PR needed. This did not cause explicit conflict as the import already existed, but not used. ## How was this patch tested? It compiles! You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark fix-build Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22175 commit d540d5af2b6ca6f0b09ebe1a36da640c3e48aea8 Author: Tathagata Das Date: 2018-08-21T23:15:55Z Added import to fix compilation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 Unfortunately this PR broke the master build. Looks like some import that probably got removed in the other PR I merged, which didnt create any direct conflict. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 @HeartSaVioR I think I agree with a second approach that you suggested. So `memoryUsedBytes` => `size for total memory usage of loaded versions` and `customMetric` => `size for memory usage of latest version` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21733 Good point. That can be minor Pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21733 LGTM. Will merge when tests pass. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21733 This looks good!! Only one comment, please don't add the .crc files. They are useless and adds unnecessarily clutter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r210422755 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread -/** - * An exception to indicate there is a missing offset in the records returned by Kafka consumer. - * This means it's either a transaction (commit or abort) marker, or an aborted message if - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. - */ -private[kafka010] class MissingOffsetException( -val offset: Long, -val nextOffsetToFetch: Long) extends Exception( - s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") - private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `true`, this --- End diff -- Will we throw an exception even when its a control message and there is no real data loss? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r210423180 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer( kafkaParams: ju.Map[String, Object]) extends Logging { import InternalKafkaConsumer._ + /** + * The internal object returned by the `fetchData` method. If `record` is empty, it means it is + * invisible (either a transaction message, or an aborted message when the consumer's + * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch + * instead. + */ + private case class FetchedRecord( +record: Option[ConsumerRecord[Array[Byte], Array[Byte]]], --- End diff -- Can;t we reuse the objects here. And do we need to have an Option, thus creating a lot of Option objects all the time? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r210985375 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer( consumer.seek(topicPartition, offset) } - private def poll(pollTimeoutMs: Long): Unit = { + /** + * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`. + * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible + * messages (either transaction messages, or aborted messages when `isolation.level` is + * `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def poll(offset: Long, pollTimeoutMs: Long): Unit = { +seek(offset) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") -fetchedData = r.iterator +offsetAfterPoll = consumer.position(topicPartition) --- End diff -- I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r210422521 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,22 +31,21 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread -/** - * An exception to indicate there is a missing offset in the records returned by Kafka consumer. - * This means it's either a transaction (commit or abort) marker, or an aborted message if - * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are - * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. - */ -private[kafka010] class MissingOffsetException( -val offset: Long, -val nextOffsetToFetch: Long) extends Exception( - s"Offset $offset is missing. The next offset to fetch is: $nextOffsetToFetch") - private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is `false`, it will --- End diff -- if failOnDataLoss is *true* then it should throw exception... isnt it? nit: try its best --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209479417 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) --- End diff -- So MissingOffsetRange is only used to signal that some offset may be missing due to control messages and nothing else. And the higher function (i.e. `get`) just handles it by resetting the fetched offsets. Why not let this `fetchData` method handle the situation instead of creating a new exception just for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209479551 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum --- End diff -- God catch. That would have never occurred to me! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209475048 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { --- End diff -- Its hard to understand this condition because it hard to understand what offsetAfterPoll means? Does it refer to the offset that will be fetched next by the KafkaConsumer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209476712 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -347,9 +391,12 @@ private[kafka010] case class InternalKafkaConsumer( } private def poll(pollTimeoutMs: Long): Unit = { +offsetBeforePoll = consumer.position(topicPartition) --- End diff -- This variable `offsetBeforePoll` seems to be only used to identify whether data was actually fetched in a poll and nothing else. Rather than define another var (there are already many that already confusing), why not just return a boolean from poll which is true or false depending on whether poll moved anything. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473392 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +/** + * An exception to indicate there is a missing offset in the records returned by Kafka consumer. + * This means it's either a transaction (commit or abort) marker, or an aborted message if + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. + */ +private[kafka010] class MissingOffsetException( --- End diff -- nit: Is this meant to be used outside this KafkaDataConsumer class? If not, then maybe make it an inner class to KafkaDataConsumer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209477156 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position + // will be changed and we can use it to determine this case. val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else { + } else if (offsetBeforePoll == offsetAfterPoll) { --- End diff -- Just to be clear, can this happen only if there is a timeout?? And if so then why push this condition and exception into the poll() method thus simplifying this method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209476548 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position --- End diff -- date => data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209478033 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. +if (offset != nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) poll(pollTimeoutMs) +} else if (!fetchedData.hasNext) { + // The last pre-fetched data has been drained. + if (offset < offsetAfterPoll) { +// Offsets in [offset, offsetAfterPoll) are missing. We should skip them. +resetFetchedData() +throw new MissingOffsetException(offset, offsetAfterPoll) + } else { +seek(offset) +poll(pollTimeoutMs) + } } if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: + // We cannot fetch anything after `poll`. Three possible cases: // - `offset` is out of range so that Kafka returns nothing. Just throw // `OffsetOutOfRangeException` to let the caller handle it. // - Cannot fetch any data before timeout. TimeoutException will be thrown. + // - Fetched something but all of them are not valid date messages. In this case, the position + // will be changed and we can use it to determine this case. val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else { + } else if (offsetBeforePoll == offsetAfterPoll) { throw new TimeoutException( s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } else { +assert(offset <= offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to $offsetAfterPoll") +throw new MissingOffsetException(offset, offsetAfterPoll) } } else { --- End diff -- Let's remove this else and reduce the condition nesting. The previous `if` statement always ends in an exception, so we can remove this else. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473432 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -31,6 +31,17 @@ import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread +/** + * An exception to indicate there is a missing offset in the records returned by Kafka consumer. + * This means it's either a transaction (commit or abort) marker, or an aborted message if + * "isolation.level" is "read_committed". The offsets in the range [offset, nextOffsetToFetch) are + * missing. In order words, `nextOffsetToFetch` indicates the next offset to fetch. + */ +private[kafka010] class MissingOffsetException( +val offset: Long, --- End diff -- maybe rename offset to something like missingOffset. Its weird to have a generic named field "offset" next to a specifically named field "nextOffsetToFetch". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209474755 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer( untilOffset: Long, --- End diff -- Update docs of this method saying that it can throw MissingOffsetException and what it means? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r209473316 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer( ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET --- End diff -- Can you add some docs to explain what these 2 vars siginify and why these vars are needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21469 I am having a second thoughts about this. Exposing the entire memory usage of all the loaded maps as another custom metric just adds more confusion. Rather the point of the the main state metric `memoryUsedBytes` is to capture how much memory is occupied because of the one partition of the state, and that implicitly should cover all the loaded versions of that state partition. So I strongly feel that instead of adding a custom metric, we should change the existing `memoryUsedBytes` to capture all the memory. I am fine adding the custom metrics hit and miss counts. No questions about that. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208485230 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208483760 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow --- End diff -- nit: why is the input typed InternalRow where everything else is UnsafeRow? seems inconsistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208482355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. --- End diff -- super nit: some of these can be compressed to a single line doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208491512 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208492158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -167,6 +165,18 @@ trait WatermarkSupport extends UnaryExecNode { } } } + + protected def removeKeysOlderThanWatermark(storeManager: StreamingAggregationStateManager, --- End diff -- incorrect indent of parameters --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208488566 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() --- End diff -- This is really does not need to be in this interface as this is not customized and is unlikely to be ever customized across implementations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208491168 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StreamingAggregationStateManagerSuite extends StreamTest { + // fields and method for test data + + val testKeys: Seq[String] = Seq("key1", "key2") + val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)") + + val testOutputSchema: StructType = StructType( +testKeys.map(createIntegerField) ++ testValues.map(createIntegerField)) + + val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes + val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testKeys.contains(p.name) + } + val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testValues.contains(p.name) + } + val expectedTestValuesSchema: StructType = testValuesAttributes.toStructType + + val testRow: UnsafeRow = { +val unsafeRowProjection = UnsafeProjection.create(testOutputSchema) +val row = unsafeRowProjection(new SpecificInternalRow(testOutputSchema)) +(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => row.setInt(index, index) } +row + } + + val expectedTestKeyRow: UnsafeRow = { +val keyProjector = GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes) +keyProjector(testRow) + } + + val expectedTestValueRowForV2: UnsafeRow = { +val valueProjector = GenerateUnsafeProjection.generate(testValuesAttributes, + testOutputAttributes) +valueProjector(testRow) + } + + private def createIntegerField(name: String): StructField = { +StructField(name, IntegerType, nullable = false) + } + + // StateManagerImplV1 + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(testKeyAttributes, testOutputAttributes, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow, + expectedTestKeyRow, testRow) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(testKeyAttributes, testOutputAttributes, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, expectedTestValuesSchema, testRow, + expectedTestKeyRow, expectedTestValueRowForV2) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + inputRowAttributes: Seq[Attribute], + version: Int): StreamingAggregationStateManager = { +StreamingAggregationStateManager.createStateManager(keysAttributes, inputRowAttributes, version) + } + + private def testGetPutIterOnStateManager( + stateManager: StreamingAggregationStateManager, + expectedValueSchema: StructType, + inputRow: UnsafeRow
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208487198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208486837 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208496351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -201,33 +211,37 @@ object WatermarkSupport { case class StateStoreRestoreExec( keyExpressions: Seq[Attribute], stateInfo: Option[StatefulOperatorStateInfo], +stateFormatVersion: Int, child: SparkPlan) extends UnaryExecNode with StateStoreReader { + private[sql] val stateManager = StreamingAggregationStateManager.createStateManager( +keyExpressions, child.output, stateFormatVersion) + override protected def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithStateStore( getStateInfo, keyExpressions.toStructType, - child.output.toStructType, + stateManager.getStateValueSchema, indexOrdinal = None, sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => -val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) val hasInput = iter.hasNext if (!hasInput && keyExpressions.isEmpty) { // If our `keyExpressions` are empty, we're getting a global aggregation. In that case // the `HashAggregateExec` will output a 0 value for the partial merge. We need to // restore the value, so that we don't overwrite our state with a 0 value, but rather // merge the 0 with existing state. + // In this case the value should represent origin row, so no need to restore. --- End diff -- what does this mean? I think this is not needed any more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208490526 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StreamingAggregationStateManagerSuite extends StreamTest { + // fields and method for test data + + val testKeys: Seq[String] = Seq("key1", "key2") + val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)") + + val testOutputSchema: StructType = StructType( +testKeys.map(createIntegerField) ++ testValues.map(createIntegerField)) + + val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes + val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testKeys.contains(p.name) + } + val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testValues.contains(p.name) + } + val expectedTestValuesSchema: StructType = testValuesAttributes.toStructType + + val testRow: UnsafeRow = { +val unsafeRowProjection = UnsafeProjection.create(testOutputSchema) +val row = unsafeRowProjection(new SpecificInternalRow(testOutputSchema)) +(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => row.setInt(index, index) } +row + } + + val expectedTestKeyRow: UnsafeRow = { +val keyProjector = GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes) +keyProjector(testRow) + } + + val expectedTestValueRowForV2: UnsafeRow = { +val valueProjector = GenerateUnsafeProjection.generate(testValuesAttributes, + testOutputAttributes) +valueProjector(testRow) + } + + private def createIntegerField(name: String): StructField = { +StructField(name, IntegerType, nullable = false) + } + + // StateManagerImplV1 + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(testKeyAttributes, testOutputAttributes, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow, + expectedTestKeyRow, testRow) + } + + // StateManagerImplV2 + test("StateManager v2 - get, put, iter") { +val stateManager = newStateManager(testKeyAttributes, testOutputAttributes, 2) + +// in V2, row for values itself (excluding keys from input row) is stored as value +// so that stored value doesn't have key part, but state manager V2 will provide same output +// as V1 when getting row for key +testGetPutIterOnStateManager(stateManager, expectedTestValuesSchema, testRow, + expectedTestKeyRow, expectedTestValueRowForV2) + } + + private def newStateManager( + keysAttributes: Seq[Attribute], + inputRowAttributes: Seq[Attribute], + version: Int): StreamingAggregationStateManager = { +StreamingAggregationStateManager.createStateManager(keysAttributes, inputRowAttributes, version) --- End diff -- This is an absolute shallow method, copying the exact same parameters to another method? Whats the point of it? --- -
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208490973 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class StreamingAggregationStateManagerSuite extends StreamTest { + // fields and method for test data + + val testKeys: Seq[String] = Seq("key1", "key2") + val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)") + + val testOutputSchema: StructType = StructType( +testKeys.map(createIntegerField) ++ testValues.map(createIntegerField)) + + val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes + val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testKeys.contains(p.name) + } + val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { p => +testValues.contains(p.name) + } + val expectedTestValuesSchema: StructType = testValuesAttributes.toStructType + + val testRow: UnsafeRow = { +val unsafeRowProjection = UnsafeProjection.create(testOutputSchema) +val row = unsafeRowProjection(new SpecificInternalRow(testOutputSchema)) +(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => row.setInt(index, index) } +row + } + + val expectedTestKeyRow: UnsafeRow = { +val keyProjector = GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes) +keyProjector(testRow) + } + + val expectedTestValueRowForV2: UnsafeRow = { +val valueProjector = GenerateUnsafeProjection.generate(testValuesAttributes, + testOutputAttributes) +valueProjector(testRow) + } + + private def createIntegerField(name: String): StructField = { +StructField(name, IntegerType, nullable = false) + } + + // StateManagerImplV1 + + test("StateManager v1 - get, put, iter") { +val stateManager = newStateManager(testKeyAttributes, testOutputAttributes, 1) + +// in V1, input row is stored as value +testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow, + expectedTestKeyRow, testRow) --- End diff -- nit: last param `testRow` to `expectedStateValue = testRow` to make it clear what it means, and distinguish it from the previous `testRow` param --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208485192 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208486899 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208491615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. + * @return The new state version. + */ +def commit(store: StateStore): Long + +/** + * Remove a single non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + */ +def remove(store: StateStore, key: UnsafeRow): Unit + +/** + * Return an iterator containing all the key-value pairs in target state store. + */ +def iterator(store: StateStore): Iterator[UnsafeRowPair] + +/** + * Return an iterator containing all the keys in target state store. + */ +def keys(store: StateStore): Iterator[UnsafeRow] + +/** + * Return an iterator containing all the values in target state store. + */ +def values(store: StateStore): Iterator[UnsafeRow] + } + + object StreamingAggregationStateManager extends Logging { +val supportedVersions = Seq(1, 2) +val legacyVersion = 1 + +def createStateManager( +keyExpressions: Seq[Attribute], +inputRowAttributes: Seq[Attribute], +stateFormatVersion: Int): StreamingAggregationStateManager = { + stateFormatVersion match { +case 1 => new StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes) +case 2 => new StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes) +case _ => throw new IllegalArgumentException(s"Version $stateFormatVersion is invalid") + } +} + } + + abstract class StreamingAggregationStateManagerBaseImpl( + protected val keyExpressions: Seq[Attribute], + protected val inputRowAttributes: Seq[Attribute]) extends StreamingAggregationStateManager { + +@transient protected lazy val keyProjector = + GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes) + +def getKey(row: InternalRow): UnsafeRow = keyProjector(row) + +override def commit(store: StateStore): Long = store.commit() + +override def remove(store: StateStore, key: UnsafeRow): Unit = store.remove(key) + +override def keys(store: StateStore): Iterator[UnsafeRow] = { + // discard and don't convert values to avoid computation + store.getRange(None, None).map(_.key) +} + } + + /** + * The implementation of StreamingAggregationStateManager for state version 1. + * In state version 1, the schema of key and value in state are follow: + * + * - key: Same as key expressions. + * - value: Same as input row attributes. The schema of value contains key
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208483242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. --- End diff -- nit: a lot of the `@param` and `@return` in the docs are a bit superfluous as it just repeats what the main statement already says. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208481983 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** --- End diff -- Ummm why is it in this package class and not in separate file?? Is there any reason it has to be `state` package object when not all of stateful require it, only streaming aggregation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208489469 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -871,6 +871,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = +buildConf("spark.sql.streaming.aggregation.stateFormatVersion") + .internal() + .doc("State format version used by streaming aggregation operations in a streaming query. " + +"State between versions are tend to be incompatible, so state format version shouldn't " + +"be modified after running.") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createWithDefault(2) --- End diff -- If you intend to change the default to the new version, then you HAVE TO add a test that ensures that existing streaming aggregation checkpoints (generated in Spark 2.3.1 for example) will not fail to recover. Similar to this test - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala#L883 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208482479 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala --- @@ -81,4 +85,221 @@ package object state { storeCoordinator) } } + + /** + * Base trait for state manager purposed to be used from streaming aggregations. + */ + sealed trait StreamingAggregationStateManager extends Serializable { + +/** + * Extract columns consisting key from input row, and return the new row for key columns. + * + * @param row The input row. + * @return The row instance which only contains key columns. + */ +def getKey(row: InternalRow): UnsafeRow + +/** + * Calculate schema for the value of state. The schema is mainly passed to the StateStoreRDD. + * + * @return An instance of StructType representing schema for the value of state. + */ +def getStateValueSchema: StructType + +/** + * Get the current value of a non-null key from the target state store. + * + * @param store The target StateStore instance. + * @param key The key whose associated value is to be returned. + * @return A non-null row if the key exists in the store, otherwise null. + */ +def get(store: StateStore, key: UnsafeRow): UnsafeRow + +/** + * Put a new value for a non-null key to the target state store. Note that key will be + * extracted from the input row, and the key would be same as the result of getKey(inputRow). + * + * @param store The target StateStore instance. + * @param row The input row. + */ +def put(store: StateStore, row: UnsafeRow): Unit + +/** + * Commit all the updates that have been made to the target state store, and return the + * new version. + * + * @param store The target StateStore instance. --- End diff -- superfluous. just the main statement has all the information. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21733#discussion_r208492352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -167,6 +165,18 @@ trait WatermarkSupport extends UnaryExecNode { } } } + + protected def removeKeysOlderThanWatermark(storeManager: StreamingAggregationStateManager, --- End diff -- Actually... where is this used? This does not seem to be used anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r206761192 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + --- End diff -- Those are custom metrics, which may or may not be present depending on the implementation of state store. I dont recommend adding them here directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206739252 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato } object SQLMetrics { - private val SUM_METRIC = "sum" - private val SIZE_METRIC = "size" - private val TIMING_METRIC = "timing" - private val AVERAGE_METRIC = "average" + val SUM_METRIC = "sum" + val SIZE_METRIC = "size" + val TIMING_METRIC = "timing" + val AVERAGE_METRIC = "average" --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206738287 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { -("numRowsTotal" -> JInt(numRowsTotal)) ~ -("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.keySet.asScala.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) +} + +val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + +if (!customMetrics.isEmpty) { --- End diff -- You are already handling the case of map being empty in `safeMapToJValue` by adding JNothing. Doesnt JNothing values just get dropped from the json text any way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r206738919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,24 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) private[sql] def jsonValue: JValue = { -("numRowsTotal" -> JInt(numRowsTotal)) ~ -("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { --- End diff -- T is always Long. Why make a generic function for that? This does not even need a separate function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21357 While the new code looks a bit cleaner, I am bit hesistant to this complete rewriting of the entire critical code. We generally do refactor of the code only if there is some ultimate benefit, e.g. performance improvement, adding a new feature becomes easy. etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org