[GitHub] [spark] huanliwang-db commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStoreConf for its usage in RocksDBConf
huanliwang-db commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049307954 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -560,10 +560,11 @@ case class RocksDBConf( object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ - val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" + val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" private case class ConfEntry(name: String, default: String) { -def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def sqlConfFullName: String = s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def lowerCaseName: String = name.toLowerCase(Locale.ROOT) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names
dongjoon-hyun commented on PR #39070: URL: https://github.com/apache/spark/pull/39070#issuecomment-1352674020 Thank you, @viirya ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names
viirya commented on code in PR #39070: URL: https://github.com/apache/spark/pull/39070#discussion_r1049306579 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -152,7 +152,7 @@ class ExecutorPodsAllocator( applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { -val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys) +val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct Review Comment: So `snapshots` may contain duplicates too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #39013: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions
zhengruifeng closed pull request #39013: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions URL: https://github.com/apache/spark/pull/39013 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #39013: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions
zhengruifeng commented on PR #39013: URL: https://github.com/apache/spark/pull/39013#issuecomment-1352669725 Let me take over this PR in https://github.com/apache/spark/pull/39071 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #39071: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions
zhengruifeng opened a new pull request, #39071: URL: https://github.com/apache/spark/pull/39071 ### What changes were proposed in this pull request? Implement the rest of string/binary functions. The first commit is https://github.com/apache/spark/pull/38921. Among them, `format_number` has data type mismatch issue and should be enabled with https://issues.apache.org/jira/browse/SPARK-41473. ### Why are the changes needed? For API coverage on Spark Connect. ### Does this PR introduce _any_ user-facing change? Yes. New functions are available on Spark Connect. ### How was this patch tested? Unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names
dongjoon-hyun commented on code in PR #39070: URL: https://github.com/apache/spark/pull/39070#discussion_r1049302741 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -162,7 +162,7 @@ class ExecutorPodsAllocator( val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod => pod.getSpec.getVolumes.asScala .flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) } -} +}.distinct Review Comment: This is since Spark 3.2.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names
dongjoon-hyun commented on code in PR #39070: URL: https://github.com/apache/spark/pull/39070#discussion_r1049302331 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala: ## @@ -152,7 +152,7 @@ class ExecutorPodsAllocator( applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { -val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys) +val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct Review Comment: The original code is in Spark 3.1.2 and Spark 3.2.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots ` to use unique list of known executor IDs and PVC names
dongjoon-hyun commented on PR #39070: URL: https://github.com/apache/spark/pull/39070#issuecomment-1352661946 cc @viirya and @attilapiros -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`
cloud-fan commented on PR #39017: URL: https://github.com/apache/spark/pull/39017#issuecomment-1352661404 > and then the python client side can generate all the sampled splits from this one. SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun opened a new pull request, #39070: [SPARK-41525][K8S] Use unique list of known executor IDs and PVC names in onNewSnapshots
dongjoon-hyun opened a new pull request, #39070: URL: https://github.com/apache/spark/pull/39070 ### What changes were proposed in this pull request? This PR improve `ExecutorPodsAllocator.onNewSnapshots` by removing duplications at `k8sKnownExecIds` and `k8sKnownPVCNames`. ### Why are the changes needed? The existing variables have lots of duplications because `snapshots` is `Seq[ExecutorPodsSnapshot]`. ``` val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys) ``` For example, if we print out the values, it looks like the following. ``` 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2 22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review because this is an improvement on the local variable computation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
HeartSaVioR commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049288529 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala: ## @@ -71,11 +71,10 @@ class StateStoreConf( /** * Additional configurations related to state store. This will capture all configs in - * SQLConf that start with `spark.sql.streaming.stateStore.` and extraOptions for a specific - * operator. + * SQLConf that start with `spark.sql.streaming.stateStore.` */ - val confs: Map[String, String] = - sqlConf.getAllConfs.filter(_._1.startsWith("spark.sql.streaming.stateStore.")) ++ extraOptions + val stateStoreSQLConfs: Map[String, String] = Review Comment: nit: sqlConfs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
HeartSaVioR commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049287294 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -560,10 +560,11 @@ case class RocksDBConf( object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ - val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" + val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" private case class ConfEntry(name: String, default: String) { -def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def sqlConfFullName: String = s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def lowerCaseName: String = name.toLowerCase(Locale.ROOT) Review Comment: After the change, the method name does not need additional `FromSQL` or `FromExtraOptions` since parameter will tell the fact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
HeartSaVioR commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049287294 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -560,10 +560,11 @@ case class RocksDBConf( object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ - val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" + val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" private case class ConfEntry(name: String, default: String) { -def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def sqlConfFullName: String = s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def lowerCaseName: String = name.toLowerCase(Locale.ROOT) Review Comment: After the change, the method name does not need additional `FromSQL` since parameter will tell the fact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
HeartSaVioR commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049286005 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -560,10 +560,11 @@ case class RocksDBConf( object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ - val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" + val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" private case class ConfEntry(name: String, default: String) { -def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def sqlConfFullName: String = s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def lowerCaseName: String = name.toLowerCase(Locale.ROOT) Review Comment: What about having different class for each case instead? It would be clearer to distinguish configs, e.g. we don't expect the same config to be set from both SQL conf and extraOptions. If we allow such case, it'll be a bit more complicated than what you proposed, e.g. we will have to define preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
HeartSaVioR commented on code in PR #39069: URL: https://github.com/apache/spark/pull/39069#discussion_r1049286005 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -560,10 +560,11 @@ case class RocksDBConf( object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ - val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" + val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb" private case class ConfEntry(name: String, default: String) { -def fullName: String = s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def sqlConfFullName: String = s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT) +def lowerCaseName: String = name.toLowerCase(Locale.ROOT) Review Comment: What about having different class for each case? It would be clearer to distinguish configs, e.g. we don't expect the same config to be set from both SQL conf and extraOptions. If we allow such case, it'll be a bit more complicated than what you proposed, e.g. we will have to define preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng commented on PR #39068: URL: https://github.com/apache/spark/pull/39068#issuecomment-1352636404 cc @HyukjinKwon @cloud-fan @amaliujia @grundprinzip @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng commented on PR #39068: URL: https://github.com/apache/spark/pull/39068#issuecomment-1352619900 The failed `./dev/test-dependencies.sh` is irrelevant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on PR #38517: URL: https://github.com/apache/spark/pull/38517#issuecomment-1352612404 @HeartSaVioR thanks for the detailed review. I think I have address all of your comments. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049255090 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } + /** + * Test async progress tracking capability with Kafka source and sink + */ + test("async progress tracking") { +val inputTopic = newTopic() +testUtils.createTopic(inputTopic, partitions = 5) + +val dataSent = new ListBuffer[String]() +testUtils.sendMessages(inputTopic, (0 until 15).map { case x => + val m = s"foo-$x" + dataSent += m + m +}.toArray, Some(0)) + +val outputTopic = newTopic() +testUtils.createTopic(outputTopic, partitions = 5) + +withTempDir { dir => + val reader = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", testUtils.brokerAddress) +.option("kafka.metadata.max.age.ms", "1") +.option("maxOffsetsPerTrigger", 5) +.option("subscribe", inputTopic) +.option("startingOffsets", "earliest") +.load() + + def startQuery(): StreamingQuery = { +reader.writeStream + .format("kafka") + .option("checkpointLocation", dir.getCanonicalPath) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", "5000") + .option("topic", outputTopic) + .option(ASYNC_PROGRESS_TRACKING_ENABLED, true) + .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000) + .queryName("kafkaStream") + .start() + } + + def readResults(): ListBuffer[String] = { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #38799: [SPARK-37099][SQL] Introduce the group limit of Window for rank-based filter to optimize top-k computation
beliefer commented on PR #38799: URL: https://github.com/apache/spark/pull/38799#issuecomment-1352608435 The failure GA is unrelated with this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huanliwang-db opened a new pull request, #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…
huanliwang-db opened a new pull request, #39069: URL: https://github.com/apache/spark/pull/39069 …eConf for its usage in RocksDBConf ### What changes were proposed in this pull request? Currently the usage of StateStoreConf is via [confs](https://github.com/huanliwang-db/spark/blob/7671bc975f2d88ab929e4982abfe3e166fa72e35/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala#L77-L78), which composes both SQL confs and extraOptions into one. The name of config for extraOptions shouldn't have to follow the name prefix of SQL conf, because it's not bound to the context of SQL conf. ### Why are the changes needed? After differentiate SQL conf and extraOptions in StateStoreConf, we should be able to adopt more use case on operator level configs by using the extraOptions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT should cover the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`
MaxGekk closed pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()` URL: https://github.com/apache/spark/pull/38864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #38712: [WIP][SPARK-41271][SQL] Parameterized SQL queries
MaxGekk closed pull request #38712: [WIP][SPARK-41271][SQL] Parameterized SQL queries URL: https://github.com/apache/spark/pull/38712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng commented on code in PR #39068: URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906 ## python/pyspark/sql/connect/functions.py: ## @@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column: return _invoke_function(name, *_cols) +def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]: +signature = inspect.signature(f) +parameters = signature.parameters.values() + +# We should exclude functions that use +# variable args and keyword argnames +# as well as keyword only args +supported_parameter_types = { +inspect.Parameter.POSITIONAL_OR_KEYWORD, +inspect.Parameter.POSITIONAL_ONLY, +} + +# Validate that +# function arity is between 1 and 3 +if not (1 <= len(parameters) <= 3): +raise ValueError( +"f should take between 1 and 3 arguments, but provided function takes {}".format( +len(parameters) +) +) + +# and all arguments can be used as positional +if not all(p.kind in supported_parameter_types for p in parameters): +raise ValueError("f should use only POSITIONAL or POSITIONAL OR KEYWORD arguments") + +return parameters + + +def _create_lambda(f: Callable) -> LambdaFunction: +""" +Create `o.a.s.sql.expressions.LambdaFunction` corresponding +to transformation described by f + +:param f: A Python of one of the following forms: +- (Column) -> Column: ... +- (Column, Column) -> Column: ... +- (Column, Column, Column) -> Column: ... +""" +parameters = _get_lambda_parameters(f) + +arg_names = ["x", "y", "z"] + +arg_cols: List[Column] = [] +for arg in arg_names[: len(parameters)]: +# TODO: How to make sure lambda variable names are unique? RPC for increasing ID? Review Comment: PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a unique variable name ``` object UnresolvedNamedLambdaVariable { // Counter to ensure lambda variable names are unique private val nextVarNameId = new AtomicInteger(0) def freshVarName(name: String): String = { s"${name}_${nextVarNameId.getAndIncrement()}" } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`
MaxGekk commented on PR #38864: URL: https://github.com/apache/spark/pull/38864#issuecomment-1352605299 Merging to master. The last commit is minor one. Thank you, @cloud-fan @xkrogen @entong @srielau for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049249895 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] RussellSpitzer commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
RussellSpitzer commented on code in PR #38823: URL: https://github.com/apache/spark/pull/38823#discussion_r1049247458 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java: ## @@ -93,4 +93,18 @@ default Transform[] inferPartitioning(CaseInsensitiveStringMap options) { default boolean supportsExternalMetadata() { return false; } + + /** + * Returns true if the source supports defining generated columns upon table creation in SQL. + * When false: any create/replace table statements with a generated column defined in the table + * schema will throw an exception during analysis. + * + * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + * The generation expression is stored in the column metadata with key "generationExpression". + * + * Override this method to allow defining generated columns in create/replace table statements. + */ + default boolean supportsGeneratedColumnsOnCreation() { Review Comment: This really should be a part of the greater the catalog capabilities since that's where create table is usually going to be invoked. I'm very nervous with saying that it is up to the Datasource to decide what is valid because different engines may decide the same sql means different things and this would require that the Datasource somehow make sure non spark engines can access the same table in the same. We spent a lot of time making an public expression class for the connectors but it feels like that should probably be invoked here as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #39067: [DON'T MERGE] Test test-dependencies
LuciferYang commented on PR #39067: URL: https://github.com/apache/spark/pull/39067#issuecomment-1352598729 Different from the local error, further investigation is required -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049243767 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = +"asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = +"_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { +extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, +"1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( +sparkSession: SparkSession, +trigger: Trigger, +triggerClock: Clock, +extraOptions: Map[String, String], +plan: WriteToStream) +extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution +.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +"async-log-write", +2, // one for offset commit and one for completion commit +new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { +try { + if (!executor.isShutdown) { +val start = System.currentTimeMillis() +executor.getQueue.put(r) +logDebug( + s"Async write paused execution for " + +s"${System.currentTimeMillis() - start} due to task queue being full." +) + } +} catch { + case e: InterruptedException => +Thread.currentThread.interrupt() +throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => +logError("Encountered error in async write executor service", e) +errorNotifier.markError(e) +} + } +}) + + override val offsetLog = new AsyncOffsetSeqLog( +sparkSession, +checkpointFile("offsets"), +asyncWritesExecutorService, +asyncProgressTrackingCheckpointingIntervalMs, +clock = triggerClock + ) + + override val commitLog = +new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { +// check if pipeline is stateful +checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { +// this is a no op for async progress tracking since we only want to commit sources only +// after the offset
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049243767 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = +"asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = +"_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { +extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, +"1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( +sparkSession: SparkSession, +trigger: Trigger, +triggerClock: Clock, +extraOptions: Map[String, String], +plan: WriteToStream) +extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution +.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +"async-log-write", +2, // one for offset commit and one for completion commit +new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { +try { + if (!executor.isShutdown) { +val start = System.currentTimeMillis() +executor.getQueue.put(r) +logDebug( + s"Async write paused execution for " + +s"${System.currentTimeMillis() - start} due to task queue being full." +) + } +} catch { + case e: InterruptedException => +Thread.currentThread.interrupt() +throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => +logError("Encountered error in async write executor service", e) +errorNotifier.markError(e) +} + } +}) + + override val offsetLog = new AsyncOffsetSeqLog( +sparkSession, +checkpointFile("offsets"), +asyncWritesExecutorService, +asyncProgressTrackingCheckpointingIntervalMs, +clock = triggerClock + ) + + override val commitLog = +new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { +// check if pipeline is stateful +checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { +// this is a no op for async progress tracking since we only want to commit sources only +// after the offset
[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng commented on code in PR #39068: URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906 ## python/pyspark/sql/connect/functions.py: ## @@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: Any) -> Column: return _invoke_function(name, *_cols) +def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]: +signature = inspect.signature(f) +parameters = signature.parameters.values() + +# We should exclude functions that use +# variable args and keyword argnames +# as well as keyword only args +supported_parameter_types = { +inspect.Parameter.POSITIONAL_OR_KEYWORD, +inspect.Parameter.POSITIONAL_ONLY, +} + +# Validate that +# function arity is between 1 and 3 +if not (1 <= len(parameters) <= 3): +raise ValueError( +"f should take between 1 and 3 arguments, but provided function takes {}".format( +len(parameters) +) +) + +# and all arguments can be used as positional +if not all(p.kind in supported_parameter_types for p in parameters): +raise ValueError("f should use only POSITIONAL or POSITIONAL OR KEYWORD arguments") + +return parameters + + +def _create_lambda(f: Callable) -> LambdaFunction: +""" +Create `o.a.s.sql.expressions.LambdaFunction` corresponding +to transformation described by f + +:param f: A Python of one of the following forms: +- (Column) -> Column: ... +- (Column, Column) -> Column: ... +- (Column, Column, Column) -> Column: ... +""" +parameters = _get_lambda_parameters(f) + +arg_names = ["x", "y", "z"] + +arg_cols: List[Column] = [] +for arg in arg_names[: len(parameters)]: +# TODO: How to make sure lambda variable names are unique? RPC for increasing ID? Review Comment: PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a unique variable name https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L8157 ``` object UnresolvedNamedLambdaVariable { // Counter to ensure lambda variable names are unique private val nextVarNameId = new AtomicInteger(0) def freshVarName(name: String): String = { s"${name}_${nextVarNameId.getAndIncrement()}" } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng commented on PR #39068: URL: https://github.com/apache/spark/pull/39068#issuecomment-1352588465 reviewers can refer to the implementation in PySpark https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L8094-L8197 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation
zhengruifeng opened a new pull request, #39068: URL: https://github.com/apache/spark/pull/39068 ### What changes were proposed in this pull request? There are 11 lambda functions in higher order functions, this PR add the basic support for `LambdaFunction` and add `array_exists` function. ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, new API ### How was this patch tested? added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade `kubernetes-client` to 6.3.0
dongjoon-hyun commented on PR #39065: URL: https://github.com/apache/spark/pull/39065#issuecomment-1352587300 Also, cc @Yikun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade `kubernetes-client` to 6.3.0
dongjoon-hyun closed pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade `kubernetes-client` to 6.3.0 URL: https://github.com/apache/spark/pull/39065 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #39067: [DON'T MERGE] Test test-dependencies
dongjoon-hyun commented on PR #39067: URL: https://github.com/apache/spark/pull/39067#issuecomment-1352584283 Thank you for investigating this, @LuciferYang . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #39067: [DON'T MERGE] Test test-dependencies
LuciferYang opened a new pull request, #39067: URL: https://github.com/apache/spark/pull/39067 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #39066: [SPARK-41523][PROTBUF][BUILD] Change `protoc-jar-maven-plugin` definition in `user-defined-protoc` profile use `protoc-jar-maven-plugin.
LuciferYang opened a new pull request, #39066: URL: https://github.com/apache/spark/pull/39066 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`
cloud-fan commented on code in PR #38864: URL: https://github.com/apache/spark/pull/38864#discussion_r1049204162 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/parameters.scala: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, TreePattern} +import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.types.DataType + +/** + * The expression represents a named parameter that should be replaces by a literal. Review Comment: ```suggestion * The expression represents a named parameter that should be replaced by a literal. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MrDLontheway commented on a diff in pull request #38893: [Spark-40099][SQL] Merge adjacent CaseWhen branches if their values are the same
MrDLontheway commented on code in PR #38893: URL: https://github.com/apache/spark/pull/38893#discussion_r1049199875 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -639,6 +639,26 @@ object RemoveNoopOperators extends Rule[LogicalPlan] { } } +/** + * Merge case condition expression with same value. + */ +object MergeConditionWithValue extends Rule[LogicalPlan] { Review Comment: @wangyum have any update ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39057: [SPARK-41513][SQL] Implement an accumulator to collect per mapper row count metrics
cloud-fan commented on code in PR #39057: URL: https://github.com/apache/spark/pull/39057#discussion_r1049199171 ## core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala: ## @@ -513,3 +513,81 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { getOrCreate.addAll(newValue) } } + + +/** + * An [[AccumulatorV2 counter]] for collecting a list of (mapper id, row count). + * + * @since 3.4.0 + */ +class MapperRowCounter extends AccumulatorV2[jl.Long, java.util.List[java.util.List[jl.Long]]] { + + private var _agg: java.util.List[java.util.List[jl.Long]] = _ Review Comment: a list of tuple2[int, int]? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #38888: [SPARK-41405][SQL] Centralize the column resolution logic
cloud-fan commented on code in PR #3: URL: https://github.com/apache/spark/pull/3#discussion_r1049198193 ## sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out: ## @@ -149,9 +149,9 @@ org.apache.spark.sql.AnalysisException "queryContext" : [ { Review Comment: The error class ``` "_LEGACY_ERROR_TEMP_2422" : { "message" : [ "grouping expressions sequence is empty, and '' is not an aggregate function. Wrap '' in windowing function(s) or wrap '' in first() (or first_value) if you don't care which value you get." ] }, ``` The query context becomes more accurate actually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SandishKumarHN commented on a diff in pull request #39039: [SPARK-40776][SQL][PROTOBUF][DOCS] Spark-Protobuf docs
SandishKumarHN commented on code in PR #39039: URL: https://github.com/apache/spark/pull/39039#discussion_r1049183578 ## docs/sql-data-sources-protobuf.md: ## @@ -0,0 +1,301 @@ +--- Review Comment: @rangadi it appears to be the same for spark-avro as well, https://github.com/apache/spark/blob/master/docs/sql-data-sources-avro.md#to_avro-and-from_avro. It will be rendered as a webpage rather than a readme file. spark-avro documentation can be found at https://spark.apache.org/docs/latest/sql-data-sources-avro.html. not sure how to test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
SandishKumarHN commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049175487 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: @rangadi thanks for the review, I have made all changes you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #39065: [SPARK-41521][BUILD] Upgrade `kubernetes-client` to 6.3.0
dongjoon-hyun commented on PR #39065: URL: https://github.com/apache/spark/pull/39065#issuecomment-1352481238 BTW, there exist breaking changes. Does this pass in your GitHub Action? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Scratch the above suggestion. Instead you could add 'else' to what you have and remove 'return'. That is simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Scratch the above suggestion. Instead you could add 'else' what you have and remove 'return'. That is simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dengziming commented on pull request #39059: [SPARK-41506][CONNECT][MINOR]: small fix in python code
dengziming commented on PR #39059: URL: https://github.com/apache/spark/pull/39059#issuecomment-1352477957 Thank you @zhengruifeng @srowen , The response time of Spark project is so short, awesome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`
beliefer commented on PR #39035: URL: https://github.com/apache/spark/pull/39035#issuecomment-1352465823 @zhengruifeng @amaliujia Thank you for all ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun opened a new pull request, #39065: [SPARK-41521][BUILD] Upgrade fabric8io - kubernetes-client from 6.2.0 to 6.3.0
panbingkun opened a new pull request, #39065: URL: https://github.com/apache/spark/pull/39065 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #39048: [SPARK-41424][UI] Protobuf serializer for TaskDataWrapper
gengliangwang commented on code in PR #39048: URL: https://github.com/apache/spark/pull/39048#discussion_r1049145069 ## core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.status.TaskDataWrapper +import org.apache.spark.status.api.v1.AccumulableInfo +import org.apache.spark.status.protobuf.Utils.getOptional + +object TaskDataWrapperSerializer { + def serialize(input: TaskDataWrapper): Array[Byte] = { +val builder = StoreTypes.TaskDataWrapper.newBuilder() + .setTaskId(input.taskId) + .setIndex(input.index) + .setAttempt(input.attempt) + .setPartitionId(input.partitionId) + .setLaunchTime(input.launchTime) + .setResultFetchStart(input.resultFetchStart) + .setDuration(input.duration) + .setExecutorId(input.executorId) + .setHost(input.host) + .setStatus(input.status) + .setTaskLocality(input.taskLocality) + .setSpeculative(input.speculative) + .setHasMetrics(input.hasMetrics) + .setExecutorDeserializeTime(input.executorDeserializeTime) + .setExecutorDeserializeCpuTime(input.executorDeserializeCpuTime) + .setExecutorRunTime(input.executorRunTime) + .setExecutorCpuTime(input.executorCpuTime) + .setResultSize(input.resultSize) + .setJvmGcTime(input.jvmGcTime) + .setResultSerializationTime(input.resultSerializationTime) + .setMemoryBytesSpilled(input.memoryBytesSpilled) + .setDiskBytesSpilled(input.diskBytesSpilled) + .setPeakExecutionMemory(input.peakExecutionMemory) + .setInputBytesRead(input.inputBytesRead) + .setInputRecordsRead(input.inputRecordsRead) + .setOutputBytesWritten(input.outputBytesWritten) + .setOutputRecordsWritten(input.outputRecordsWritten) + .setShuffleRemoteBlocksFetched(input.shuffleRemoteBlocksFetched) + .setShuffleLocalBlocksFetched(input.shuffleLocalBlocksFetched) + .setShuffleFetchWaitTime(input.shuffleFetchWaitTime) + .setShuffleRemoteBytesRead(input.shuffleRemoteBytesRead) + .setShuffleRemoteBytesReadToDisk(input.shuffleRemoteBytesReadToDisk) + .setShuffleLocalBytesRead(input.shuffleLocalBytesRead) + .setShuffleRecordsRead(input.shuffleRecordsRead) + .setShuffleBytesWritten(input.shuffleBytesWritten) + .setShuffleWriteTime(input.shuffleWriteTime) + .setShuffleRecordsWritten(input.shuffleRecordsWritten) + .setStageId(input.stageId) + .setStageAttemptId(input.stageAttemptId) +input.errorMessage.foreach(builder.setErrorMessage) +input.accumulatorUpdates.foreach { update => + builder.addAccumulatorUpdates(serializeAccumulableInfo(update)) +} +builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): TaskDataWrapper = { +val binary = StoreTypes.TaskDataWrapper.parseFrom(bytes) +val accumulatorUpdates = new ArrayBuffer[AccumulableInfo]() +binary.getAccumulatorUpdatesList.forEach { update => + accumulatorUpdates.append(new AccumulableInfo( +id = update.getId, +name = update.getName, +update = getOptional(update.hasUpdate, update.getUpdate), +value = update.getValue)) +} +new TaskDataWrapper( + taskId = binary.getTaskId, + index = binary.getIndex, + attempt = binary.getAttempt, + partitionId = binary.getPartitionId, + launchTime = binary.getLaunchTime, + resultFetchStart = binary.getResultFetchStart, + duration = binary.getDuration, + executorId = binary.getExecutorId, + host = binary.getHost, + status = binary.getStatus, + taskLocality = binary.getTaskLocality, + speculative = binary.getSpeculative, + accumulatorUpdates = accumulatorUpdates.toSeq, + errorMessage = getOptional(binary.hasErrorMessage, binary.getErrorMessage), + hasMetrics = binary.getHasMetrics, + executorDeserializeTime = binary.getExecutorDeserializeTime, + executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime, + executorRunTime = binary.getExe
[GitHub] [spark] kelvinjian-db opened a new pull request, #39064: [SPARK-41520] Split AND_OR TreePattern to separate AND and OR TreePatterns
kelvinjian-db opened a new pull request, #39064: URL: https://github.com/apache/spark/pull/39064 ### What changes were proposed in this pull request? Removed `AND_OR` TreePattern, replaced with separate `AND` TreePattern and `OR` TreePattern. ### Why are the changes needed? This way we can be more fine-grained with how we match for AND/OR patterns, as many times we want to treat them separately. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No tests were added as there are not any existing unit tests for TreePatterns. Instead, I just made sure the existing relevant tests (e.g. `BooleanSimplificationSuite`) passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #38956: [SPARK-41319][CONNECT][PYTHON] Implement Column.{when, otherwise} and Function `when` with `UnresolvedFunction`
zhengruifeng commented on PR #38956: URL: https://github.com/apache/spark/pull/38956#issuecomment-1352461291 cc @HyukjinKwon @cloud-fan @amaliujia -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`
zhengruifeng commented on PR #39035: URL: https://github.com/apache/spark/pull/39035#issuecomment-1352460909 merged into master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`
zhengruifeng closed pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex` URL: https://github.com/apache/spark/pull/39035 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
rangadi commented on code in PR #38922: URL: https://github.com/apache/spark/pull/38922#discussion_r1049126074 ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. Review Comment: '-1' implies recursive fields are not allowed. ("disables" does not clearly imply that it will be an error") ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. + val circularReferenceDepth: Int = parameters.getOrElse("circularReferenceDepth", "-1").toInt Review Comment: Suggestion for renaming this option: _"recursive.fields.max.depth"_ _circularReferenceDepth_ sounds very code variable type. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && + protobufOptions.circularReferenceDepth < 0 ) { throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) +} else if (existingRecordNames.contains(recordName) && + existingRecordNames.getOrElse(recordName, 0) +> protobufOptions.circularReferenceDepth) { + return Some(StructField(fd.getName, NullType, nullable = false)) Review Comment: Why is nullable false? ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala: ## @@ -38,6 +38,12 @@ private[sql] class ProtobufOptions( val parseMode: ParseMode = parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) + + // Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting + // it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed + // thrice. circularReferenceDepth value greater than 2 is not allowed. If the not + // specified, it will default to -1, which disables recursive fields. Review Comment: Also warn that if the the protobuf record has more depth for recursive fields than allowed here, it will be truncated to the allowed depth. The implies some fields are discarded from the record. Could you add a simple example in the comment showing resulting spark schema when this is set to '0' and '2'. ## connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala: ## @@ -92,14 +106,26 @@ object SchemaConverters { MapType(keyType, valueType, valueContainsNull = false).defaultConcreteType, nullable = false)) case MESSAGE => -if (existingRecordNames.contains(fd.getFullName)) { +// Setting the circularReferenceDepth to 0 allows the field to be recursed once, setting +// it to 1 allows it to be recursed twice, and setting it to 2 allows it to be recursed +// thrice. circularReferenceDepth value greater than 2 is not allowed. If the not +// specified, it will default to -1, which disables recursive fields. +val recordName = fd.getMessageType.getFullName +if (existingRecordNames.contains(recordName) && Review Comment: Better to remove 'return' statement. How about" val recursiveDepth = existingRecordNames.ge
[GitHub] [spark] allisonport-db commented on pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
allisonport-db commented on PR #38823: URL: https://github.com/apache/spark/pull/38823#issuecomment-1352455011 > Does the SQL standard say anything about the restrictions of the generate expression? Can we allow `GENERATED AS rand()`? I think at least catalyst should have a rule to check the expression and make sure the data type is the same as column type. Checking for data type make sense. Other restrictions we can enforce - no UDFs - only deterministic functions - no subqueries - no window functions - no aggregate functions - no generator functions - interdependence with other generated columns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } + /** + * Test async progress tracking capability with Kafka source and sink + */ + test("async progress tracking") { +val inputTopic = newTopic() +testUtils.createTopic(inputTopic, partitions = 5) + +val dataSent = new ListBuffer[String]() +testUtils.sendMessages(inputTopic, (0 until 15).map { case x => + val m = s"foo-$x" + dataSent += m + m +}.toArray, Some(0)) + +val outputTopic = newTopic() +testUtils.createTopic(outputTopic, partitions = 5) + +withTempDir { dir => + val reader = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", testUtils.brokerAddress) +.option("kafka.metadata.max.age.ms", "1") +.option("maxOffsetsPerTrigger", 5) +.option("subscribe", inputTopic) +.option("startingOffsets", "earliest") +.load() + + def startQuery(): StreamingQuery = { +reader.writeStream + .format("kafka") + .option("checkpointLocation", dir.getCanonicalPath) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", "5000") + .option("topic", outputTopic) + .option(ASYNC_PROGRESS_TRACKING_ENABLED, true) + .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000) + .queryName("kafkaStream") + .start() + } + + def readResults(): ListBuffer[String] = { Review Comment: The output would be same but the code and actual execution would be much simpler in batch query. See below code when we just go with batch query: ``` spark.read .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("startingOffsets", "earliest") .option("subscribe", outputTopic) .load() .select("CAST(value AS string)") .toDS() .collect() .map(_._1) ``` The entire code in the method can be replaced with this query. Haven't gave a try but the actual code that could execute won't be much different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } + /** + * Test async progress tracking capability with Kafka source and sink + */ + test("async progress tracking") { +val inputTopic = newTopic() +testUtils.createTopic(inputTopic, partitions = 5) + +val dataSent = new ListBuffer[String]() +testUtils.sendMessages(inputTopic, (0 until 15).map { case x => + val m = s"foo-$x" + dataSent += m + m +}.toArray, Some(0)) + +val outputTopic = newTopic() +testUtils.createTopic(outputTopic, partitions = 5) + +withTempDir { dir => + val reader = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", testUtils.brokerAddress) +.option("kafka.metadata.max.age.ms", "1") +.option("maxOffsetsPerTrigger", 5) +.option("subscribe", inputTopic) +.option("startingOffsets", "earliest") +.load() + + def startQuery(): StreamingQuery = { +reader.writeStream + .format("kafka") + .option("checkpointLocation", dir.getCanonicalPath) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", "5000") + .option("topic", outputTopic) + .option(ASYNC_PROGRESS_TRACKING_ENABLED, true) + .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000) + .queryName("kafkaStream") + .start() + } + + def readResults(): ListBuffer[String] = { Review Comment: The output would be same but the code and actual execution would be much simpler in batch query. See below code when we just go with batch query: ``` spark.read .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("startingOffsets", "earliest") .option("subscribe", outputTopic) .load() .select("CAST(value AS string)") .toDS() .collect() .map(_._1) ``` The entire code in the method can be replaced with this query. Haven't gave a try but the actual code that could execute won't be much different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #39039: [SPARK-40776][SQL][PROTOBUF][DOCS] Spark-Protobuf docs
rangadi commented on code in PR #39039: URL: https://github.com/apache/spark/pull/39039#discussion_r1049120692 ## docs/sql-data-sources-protobuf.md: ## @@ -0,0 +1,301 @@ +--- Review Comment: Could you check the formatting. It seems to require fixes. Sample from github view of this page below. It it not rendered correctly. https://user-images.githubusercontent.com/502522/207749158-8990d41c-4d60-411d-913f-232b432df2de.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } + /** + * Test async progress tracking capability with Kafka source and sink + */ + test("async progress tracking") { +val inputTopic = newTopic() +testUtils.createTopic(inputTopic, partitions = 5) + +val dataSent = new ListBuffer[String]() +testUtils.sendMessages(inputTopic, (0 until 15).map { case x => + val m = s"foo-$x" + dataSent += m + m +}.toArray, Some(0)) + +val outputTopic = newTopic() +testUtils.createTopic(outputTopic, partitions = 5) + +withTempDir { dir => + val reader = spark +.readStream +.format("kafka") +.option("kafka.bootstrap.servers", testUtils.brokerAddress) +.option("kafka.metadata.max.age.ms", "1") +.option("maxOffsetsPerTrigger", 5) +.option("subscribe", inputTopic) +.option("startingOffsets", "earliest") +.load() + + def startQuery(): StreamingQuery = { +reader.writeStream + .format("kafka") + .option("checkpointLocation", dir.getCanonicalPath) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.max.block.ms", "5000") + .option("topic", outputTopic) + .option(ASYNC_PROGRESS_TRACKING_ENABLED, true) + .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000) + .queryName("kafkaStream") + .start() + } + + def readResults(): ListBuffer[String] = { Review Comment: The output would be same but the code and actual execution would be much simpler in batch query. See below code when we just go with batch query: ``` val data = spark.read.format("kafka")...load().select("CAST(value AS string)").toDS().collect().map(_._1) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
allisonport-db commented on code in PR #38823: URL: https://github.com/apache/spark/pull/38823#discussion_r1049104281 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java: ## @@ -93,4 +93,18 @@ default Transform[] inferPartitioning(CaseInsensitiveStringMap options) { default boolean supportsExternalMetadata() { return false; } + + /** + * Returns true if the source supports defining generated columns upon table creation in SQL. + * When false: any create/replace table statements with a generated column defined in the table + * schema will throw an exception during analysis. + * + * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + * The generation expression is stored in the column metadata with key "generationExpression". + * + * Override this method to allow defining generated columns in create/replace table statements. + */ + default boolean supportsGeneratedColumnsOnCreation() { Review Comment: @sunchao `TableCapability` has the same issue described above as including it in `Table` @RussellSpitzer I think it would be up to the implementing data source to restrict what's supported? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049102400 ## core/src/main/scala/org/apache/spark/util/ThreadUtils.scala: ## @@ -167,6 +167,27 @@ private[spark] object ThreadUtils { Executors.newFixedThreadPool(1, threadFactory).asInstanceOf[ThreadPoolExecutor] } + /** + * Wrapper over newSingleThreadExecutor that allows the specification + * of a RejectedExecutionHandler + */ + def newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +threadName: String, Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
allisonport-db commented on code in PR #38823: URL: https://github.com/apache/spark/pull/38823#discussion_r1049102169 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala: ## @@ -171,6 +171,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) + + if (GeneratedColumn.hasGeneratedColumns(c.tableSchema) && Review Comment: Is there a reason why we match `ResolvedV1Identifier` here instead of `ResolvedV1TableIdentifier`? I think we would want to add the API to `TableCatalog` and not `CatalogPlugin` right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
allisonport-db commented on code in PR #38823: URL: https://github.com/apache/spark/pull/38823#discussion_r1049100728 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java: ## @@ -93,4 +93,18 @@ default Transform[] inferPartitioning(CaseInsensitiveStringMap options) { default boolean supportsExternalMetadata() { return false; } + + /** + * Returns true if the source supports defining generated columns upon table creation in SQL. + * When false: any create/replace table statements with a generated column defined in the table + * schema will throw an exception during analysis. + * + * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + * The generation expression is stored in the column metadata with key "generationExpression". + * + * Override this method to allow defining generated columns in create/replace table statements. + */ + default boolean supportsGeneratedColumnsOnCreation() { Review Comment: Couldn't this be an issue though if we add the syntax to another statement (for example alter table add column) and the catalog/data source (whatever we land on) hasn't updated their implementation yet to accomodate for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049023076 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049099509 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049096226 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049095786 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] github-actions[bot] commented on pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge
github-actions[bot] commented on PR #37711: URL: https://github.com/apache/spark/pull/37711#issuecomment-1352393959 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049095149 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049094888 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049093714 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049093235 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049092707 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] HyukjinKwon closed pull request #39056: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept `tuple` and `dict`
HyukjinKwon closed pull request #39056: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept `tuple` and `dict` URL: https://github.com/apache/spark/pull/39056 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #39056: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept `tuple` and `dict`
HyukjinKwon commented on PR #39056: URL: https://github.com/apache/spark/pull/39056#issuecomment-1352388686 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #39060: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null
HyukjinKwon closed pull request #39060: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null URL: https://github.com/apache/spark/pull/39060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #39060: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null
HyukjinKwon commented on PR #39060: URL: https://github.com/apache/spark/pull/39060#issuecomment-1352384703 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049083624 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = +"asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = +"_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { +extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, +"1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( +sparkSession: SparkSession, +trigger: Trigger, +triggerClock: Clock, +extraOptions: Map[String, String], +plan: WriteToStream) +extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution +.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +"async-log-write", +2, // one for offset commit and one for completion commit +new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { +try { + if (!executor.isShutdown) { +val start = System.currentTimeMillis() +executor.getQueue.put(r) +logDebug( + s"Async write paused execution for " + +s"${System.currentTimeMillis() - start} due to task queue being full." +) + } +} catch { + case e: InterruptedException => +Thread.currentThread.interrupt() +throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => +logError("Encountered error in async write executor service", e) +errorNotifier.markError(e) +} + } +}) + + override val offsetLog = new AsyncOffsetSeqLog( +sparkSession, +checkpointFile("offsets"), +asyncWritesExecutorService, +asyncProgressTrackingCheckpointingIntervalMs, +clock = triggerClock + ) + + override val commitLog = +new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { +// check if pipeline is stateful +checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { +// this is a no op for async progress tracking since we only want to commit sources only +// after the offse
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049083624 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = +"asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = +"_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { +extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, +"1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( +sparkSession: SparkSession, +trigger: Trigger, +triggerClock: Clock, +extraOptions: Map[String, String], +plan: WriteToStream) +extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution +.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +"async-log-write", +2, // one for offset commit and one for completion commit +new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { +try { + if (!executor.isShutdown) { +val start = System.currentTimeMillis() +executor.getQueue.put(r) +logDebug( + s"Async write paused execution for " + +s"${System.currentTimeMillis() - start} due to task queue being full." +) + } +} catch { + case e: InterruptedException => +Thread.currentThread.interrupt() +throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => +logError("Encountered error in async write executor service", e) +errorNotifier.markError(e) +} + } +}) + + override val offsetLog = new AsyncOffsetSeqLog( +sparkSession, +checkpointFile("offsets"), +asyncWritesExecutorService, +asyncProgressTrackingCheckpointingIntervalMs, +clock = triggerClock + ) + + override val commitLog = +new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { +// check if pipeline is stateful +checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { +// this is a no op for async progress tracking since we only want to commit sources only +// after the offse
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049079507 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049079011 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049077958 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049076380 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049075731 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049075469 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049074121 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049073555 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049071928 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049071823 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049071577 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049070974 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049070725 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049070590 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049070447 ## core/src/main/scala/org/apache/spark/util/ThreadUtils.scala: ## @@ -167,6 +167,27 @@ private[spark] object ThreadUtils { Executors.newFixedThreadPool(1, threadFactory).asInstanceOf[ThreadPoolExecutor] } + /** + * Wrapper over newSingleThreadExecutor that allows the specification + * of a RejectedExecutionHandler + */ + def newDaemonSingleThreadExecutorWithRejectedExecutionHandler( +threadName: String, Review Comment: Not really. Please note that params in method call and params in method definition have different indentation. 2 spaces for former, 4 spaces for latter. https://github.com/databricks/scala-style-guide#indent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049069404 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049032417 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +
[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking
jerrypeng commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1049028853 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala: ## @@ -0,0 +1,1865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{File, OutputStream} +import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ListBuffer + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.connector.read.streaming +import org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, ASYNC_PROGRESS_TRACKING_ENABLED, ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK} +import org.apache.spark.sql.functions.{column, window} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.util.{Clock, Utils} + +class AsyncProgressTrackingMicroBatchExecutionSuite +extends StreamTest +with BeforeAndAfter +with Matchers { + + import testImplicits._ + + after { +sqlContext.streams.active.foreach(_.stop()) + } + + def getListOfFiles(dir: String): List[File] = { +val d = new File(dir) +if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList +} else { + List[File]() +} + } + + def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.areWritesPendingOrInProgress() should be(false) +} + } + + def waitPendingPurges(streamExecution: StreamExecution): Unit = { + assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution]) +eventually(timeout(Span(5, Seconds))) { + streamExecution +.asInstanceOf[AsyncProgressTrackingMicroBatchExecution] +.arePendingAsyncPurge should be(false) +} + } + + // test the basic functionality i.e. happy path + test("async WAL commits happy path") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +val tableName = "test" + +def startQuery(): StreamingQuery = { + ds.writeStream +.format("memory") +.queryName(tableName) +.option(ASYNC_PROGRESS_TRACKING_ENABLED, true) +.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0) +.option("checkpointLocation", checkpointLocation) +.start() +} +val query = startQuery() +val expected = new ListBuffer[Row]() +for (j <- 0 until 100) { + for (i <- 0 until 10) { +val v = i + (j * 10) +inputData.addData({ v }) +expected += Row(v) + } + query.processAllAvailable() +} + +checkAnswer( + spark.table(tableName), + expected.toSeq +) + } + + test("async WAL commits recovery") { +val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + +val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) +val ds = inputData.toDF() + +var index = 0 +// to synchronize producing and consuming messages so that +// we can generate and read the desired number of batches +var countDownLatch = new CountDownLatch(10) +val sem = new Semaphore(1) +val data = new ListBuffer[Int]() +def startQuery(): StreamingQuery = { + ds.writeStream +.foreachBatch((ds: Dataset[Row], batchId: Long) => { +