[GitHub] [spark] huanliwang-db commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStoreConf for its usage in RocksDBConf

2022-12-14 Thread GitBox


huanliwang-db commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049307954


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -560,10 +560,11 @@ case class RocksDBConf(
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
-  val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
+  val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
   private case class ConfEntry(name: String, default: String) {
-def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def sqlConfFullName: String = 
s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def lowerCaseName: String = name.toLowerCase(Locale.ROOT)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names

2022-12-14 Thread GitBox


dongjoon-hyun commented on PR #39070:
URL: https://github.com/apache/spark/pull/39070#issuecomment-1352674020

   Thank you, @viirya !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names

2022-12-14 Thread GitBox


viirya commented on code in PR #39070:
URL: https://github.com/apache/spark/pull/39070#discussion_r1049306579


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -152,7 +152,7 @@ class ExecutorPodsAllocator(
   applicationId: String,
   schedulerBackend: KubernetesClusterSchedulerBackend,
   snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
-val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
+val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct

Review Comment:
   So `snapshots` may contain duplicates too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #39013: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions

2022-12-14 Thread GitBox


zhengruifeng closed pull request #39013: [SPARK-41472][CONNECT][PYTHON] 
Implement the rest of string/binary functions
URL: https://github.com/apache/spark/pull/39013


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #39013: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions

2022-12-14 Thread GitBox


zhengruifeng commented on PR #39013:
URL: https://github.com/apache/spark/pull/39013#issuecomment-1352669725

   Let me take over this PR in https://github.com/apache/spark/pull/39071


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #39071: [SPARK-41472][CONNECT][PYTHON] Implement the rest of string/binary functions

2022-12-14 Thread GitBox


zhengruifeng opened a new pull request, #39071:
URL: https://github.com/apache/spark/pull/39071

   ### What changes were proposed in this pull request?
   Implement the rest of string/binary functions. The first commit is 
https://github.com/apache/spark/pull/38921.
   
   Among them, `format_number` has data type mismatch issue and should be 
enabled with https://issues.apache.org/jira/browse/SPARK-41473.
   
   ### Why are the changes needed?
   For API coverage on Spark Connect.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. New functions are available on Spark Connect.
   
   ### How was this patch tested?
   Unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names

2022-12-14 Thread GitBox


dongjoon-hyun commented on code in PR #39070:
URL: https://github.com/apache/spark/pull/39070#discussion_r1049302741


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -162,7 +162,7 @@ class ExecutorPodsAllocator(
 val k8sKnownPVCNames = 
snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
   pod.getSpec.getVolumes.asScala
 .flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) 
}
-}
+}.distinct

Review Comment:
   This is since Spark 3.2.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots` to use unique lists of known executor IDs and PVC names

2022-12-14 Thread GitBox


dongjoon-hyun commented on code in PR #39070:
URL: https://github.com/apache/spark/pull/39070#discussion_r1049302331


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala:
##
@@ -152,7 +152,7 @@ class ExecutorPodsAllocator(
   applicationId: String,
   schedulerBackend: KubernetesClusterSchedulerBackend,
   snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
-val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
+val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct

Review Comment:
   The original code is in Spark 3.1.2 and Spark 3.2.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #39070: [SPARK-41525][K8S] Improve `onNewSnapshots ` to use unique list of known executor IDs and PVC names

2022-12-14 Thread GitBox


dongjoon-hyun commented on PR #39070:
URL: https://github.com/apache/spark/pull/39070#issuecomment-1352661946

   cc @viirya and @attilapiros 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #39017: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

2022-12-14 Thread GitBox


cloud-fan commented on PR #39017:
URL: https://github.com/apache/spark/pull/39017#issuecomment-1352661404

   > and then the python client side can generate all the sampled splits from 
this one.
   
   SGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun opened a new pull request, #39070: [SPARK-41525][K8S] Use unique list of known executor IDs and PVC names in onNewSnapshots

2022-12-14 Thread GitBox


dongjoon-hyun opened a new pull request, #39070:
URL: https://github.com/apache/spark/pull/39070

   ### What changes were proposed in this pull request?
   
   This PR improve `ExecutorPodsAllocator.onNewSnapshots` by removing 
duplications at `k8sKnownExecIds` and `k8sKnownPVCNames`.
   
   ### Why are the changes needed?
   
   The existing variables have lots of duplications because `snapshots` is 
`Seq[ExecutorPodsSnapshot]`.
   ```
   val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
   ```
   
   For example, if we print out the values, it looks like the following.
   ```
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 1
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 2
   22/12/15 07:09:37 INFO ExecutorPodsAllocator: 3
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Manual review because this is an improvement on the local variable 
computation.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049288529


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala:
##
@@ -71,11 +71,10 @@ class StateStoreConf(
 
   /**
* Additional configurations related to state store. This will capture all 
configs in
-   * SQLConf that start with `spark.sql.streaming.stateStore.` and 
extraOptions for a specific
-   * operator.
+   * SQLConf that start with `spark.sql.streaming.stateStore.`
*/
-  val confs: Map[String, String] =
-
sqlConf.getAllConfs.filter(_._1.startsWith("spark.sql.streaming.stateStore.")) 
++ extraOptions
+  val stateStoreSQLConfs: Map[String, String] =

Review Comment:
   nit: sqlConfs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049287294


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -560,10 +560,11 @@ case class RocksDBConf(
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
-  val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
+  val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
   private case class ConfEntry(name: String, default: String) {
-def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def sqlConfFullName: String = 
s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def lowerCaseName: String = name.toLowerCase(Locale.ROOT)

Review Comment:
   After the change, the method name does not need additional `FromSQL` or 
`FromExtraOptions` since parameter will tell the fact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049287294


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -560,10 +560,11 @@ case class RocksDBConf(
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
-  val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
+  val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
   private case class ConfEntry(name: String, default: String) {
-def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def sqlConfFullName: String = 
s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def lowerCaseName: String = name.toLowerCase(Locale.ROOT)

Review Comment:
   After the change, the method name does not need additional `FromSQL` since 
parameter will tell the fact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049286005


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -560,10 +560,11 @@ case class RocksDBConf(
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
-  val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
+  val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
   private case class ConfEntry(name: String, default: String) {
-def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def sqlConfFullName: String = 
s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def lowerCaseName: String = name.toLowerCase(Locale.ROOT)

Review Comment:
   What about having different class for each case instead? It would be clearer 
to distinguish configs, e.g. we don't expect the same config to be set from 
both SQL conf and extraOptions. If we allow such case, it'll be a bit more 
complicated than what you proposed, e.g. we will have to define preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #39069:
URL: https://github.com/apache/spark/pull/39069#discussion_r1049286005


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -560,10 +560,11 @@ case class RocksDBConf(
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
-  val ROCKSDB_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
+  val ROCKSDB_SQL_CONF_NAME_PREFIX = "spark.sql.streaming.stateStore.rocksdb"
 
   private case class ConfEntry(name: String, default: String) {
-def fullName: String = 
s"$ROCKSDB_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def sqlConfFullName: String = 
s"$ROCKSDB_SQL_CONF_NAME_PREFIX.${name}".toLowerCase(Locale.ROOT)
+def lowerCaseName: String = name.toLowerCase(Locale.ROOT)

Review Comment:
   What about having different class for each case? It would be clearer to 
distinguish configs, e.g. we don't expect the same config to be set from both 
SQL conf and extraOptions. If we allow such case, it'll be a bit more 
complicated than what you proposed, e.g. we will have to define preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng commented on PR #39068:
URL: https://github.com/apache/spark/pull/39068#issuecomment-1352636404

   cc @HyukjinKwon @cloud-fan @amaliujia @grundprinzip @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng commented on PR #39068:
URL: https://github.com/apache/spark/pull/39068#issuecomment-1352619900

   The failed `./dev/test-dependencies.sh` is irrelevant 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on PR #38517:
URL: https://github.com/apache/spark/pull/38517#issuecomment-1352612404

   @HeartSaVioR thanks for the detailed review.  I think I have address all of 
your comments. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049255090


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 true
   }
 
+  /**
+   * Test async progress tracking capability with Kafka source and sink
+   */
+  test("async progress tracking") {
+val inputTopic = newTopic()
+testUtils.createTopic(inputTopic, partitions = 5)
+
+val dataSent = new ListBuffer[String]()
+testUtils.sendMessages(inputTopic, (0 until 15).map { case x =>
+  val m = s"foo-$x"
+  dataSent += m
+  m
+}.toArray, Some(0))
+
+val outputTopic = newTopic()
+testUtils.createTopic(outputTopic, partitions = 5)
+
+withTempDir { dir =>
+  val reader = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+.option("kafka.metadata.max.age.ms", "1")
+.option("maxOffsetsPerTrigger", 5)
+.option("subscribe", inputTopic)
+.option("startingOffsets", "earliest")
+.load()
+
+  def startQuery(): StreamingQuery = {
+reader.writeStream
+  .format("kafka")
+  .option("checkpointLocation", dir.getCanonicalPath)
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.max.block.ms", "5000")
+  .option("topic", outputTopic)
+  .option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+  .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000)
+  .queryName("kafkaStream")
+  .start()
+  }
+
+  def readResults(): ListBuffer[String] = {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #38799: [SPARK-37099][SQL] Introduce the group limit of Window for rank-based filter to optimize top-k computation

2022-12-14 Thread GitBox


beliefer commented on PR #38799:
URL: https://github.com/apache/spark/pull/38799#issuecomment-1352608435

   The failure GA is unrelated with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huanliwang-db opened a new pull request, #39069: [SPARK-41524][SS] Differentiate SQLConf and extraOptions in StateStor…

2022-12-14 Thread GitBox


huanliwang-db opened a new pull request, #39069:
URL: https://github.com/apache/spark/pull/39069

   …eConf for its usage in RocksDBConf
   
   
   
   ### What changes were proposed in this pull request?
   Currently the usage of StateStoreConf is via 
[confs](https://github.com/huanliwang-db/spark/blob/7671bc975f2d88ab929e4982abfe3e166fa72e35/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala#L77-L78),
 which composes both SQL confs and extraOptions into one. The name of config 
for extraOptions shouldn't have to follow the name prefix of SQL conf, because 
it's not bound to the context of SQL conf.
   
   
   
   
   ### Why are the changes needed?
   
   After differentiate SQL conf and extraOptions in StateStoreConf, we should 
be able to adopt more use case on operator level configs by using the 
extraOptions.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   
   ### How was this patch tested?
   Existing UT should cover the change
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`

2022-12-14 Thread GitBox


MaxGekk closed pull request #38864: [SPARK-41271][SQL] Support parameterized 
SQL queries by `sql()`
URL: https://github.com/apache/spark/pull/38864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #38712: [WIP][SPARK-41271][SQL] Parameterized SQL queries

2022-12-14 Thread GitBox


MaxGekk closed pull request #38712: [WIP][SPARK-41271][SQL] Parameterized SQL 
queries
URL: https://github.com/apache/spark/pull/38712


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906


##
python/pyspark/sql/connect/functions.py:
##
@@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: 
Any) -> Column:
 return _invoke_function(name, *_cols)
 
 
+def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
+signature = inspect.signature(f)
+parameters = signature.parameters.values()
+
+# We should exclude functions that use
+# variable args and keyword argnames
+# as well as keyword only args
+supported_parameter_types = {
+inspect.Parameter.POSITIONAL_OR_KEYWORD,
+inspect.Parameter.POSITIONAL_ONLY,
+}
+
+# Validate that
+# function arity is between 1 and 3
+if not (1 <= len(parameters) <= 3):
+raise ValueError(
+"f should take between 1 and 3 arguments, but provided function 
takes {}".format(
+len(parameters)
+)
+)
+
+# and all arguments can be used as positional
+if not all(p.kind in supported_parameter_types for p in parameters):
+raise ValueError("f should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments")
+
+return parameters
+
+
+def _create_lambda(f: Callable) -> LambdaFunction:
+"""
+Create `o.a.s.sql.expressions.LambdaFunction` corresponding
+to transformation described by f
+
+:param f: A Python of one of the following forms:
+- (Column) -> Column: ...
+- (Column, Column) -> Column: ...
+- (Column, Column, Column) -> Column: ...
+"""
+parameters = _get_lambda_parameters(f)
+
+arg_names = ["x", "y", "z"]
+
+arg_cols: List[Column] = []
+for arg in arg_names[: len(parameters)]:
+# TODO: How to make sure lambda variable names are unique? RPC for 
increasing ID?

Review Comment:
   PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a 
unique variable name
   
   
   ```
   object UnresolvedNamedLambdaVariable {
   
 // Counter to ensure lambda variable names are unique
 private val nextVarNameId = new AtomicInteger(0)
   
 def freshVarName(name: String): String = {
   s"${name}_${nextVarNameId.getAndIncrement()}"
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`

2022-12-14 Thread GitBox


MaxGekk commented on PR #38864:
URL: https://github.com/apache/spark/pull/38864#issuecomment-1352605299

   Merging to master. The last commit is minor one.
   Thank you, @cloud-fan @xkrogen @entong @srielau for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049249895


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] RussellSpitzer commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

2022-12-14 Thread GitBox


RussellSpitzer commented on code in PR #38823:
URL: https://github.com/apache/spark/pull/38823#discussion_r1049247458


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java:
##
@@ -93,4 +93,18 @@ default Transform[] 
inferPartitioning(CaseInsensitiveStringMap options) {
   default boolean supportsExternalMetadata() {
 return false;
   }
+
+  /**
+   * Returns true if the source supports defining generated columns upon table 
creation in SQL.
+   * When false: any create/replace table statements with a generated column 
defined in the table
+   * schema will throw an exception during analysis.
+   *
+   * A generated column is defined with syntax: {@code colName colType 
GENERATED ALWAYS AS (expr)}
+   * The generation expression is stored in the column metadata with key 
"generationExpression".
+   *
+   * Override this method to allow defining generated columns in 
create/replace table statements.
+   */
+  default boolean supportsGeneratedColumnsOnCreation() {

Review Comment:
   This really should be a part of the greater the catalog capabilities since 
that's where create table is usually going to be invoked. I'm very nervous with 
saying that it is up to the Datasource to decide what is valid because 
different engines may decide the same sql means different things and this would 
require that the Datasource somehow make sure non spark engines can access the 
same table in the same. We spent a lot of time making an public expression 
class for the connectors but it feels like that should probably be invoked here 
as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #39067: [DON'T MERGE] Test test-dependencies

2022-12-14 Thread GitBox


LuciferYang commented on PR #39067:
URL: https://github.com/apache/spark/pull/39067#issuecomment-1352598729

   Different from the local error, further investigation is required
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049243767


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+  extraOptions: Map[String, String]): Long = {
+extraOptions
+  .getOrElse(
+
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+"1000"
+  )
+  .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+sparkSession: SparkSession,
+trigger: Trigger,
+triggerClock: Clock,
+extraOptions: Map[String, String],
+plan: WriteToStream)
+extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+"async-log-write",
+2, // one for offset commit and one for completion commit
+new RejectedExecutionHandler() {
+  override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+try {
+  if (!executor.isShutdown) {
+val start = System.currentTimeMillis()
+executor.getQueue.put(r)
+logDebug(
+  s"Async write paused execution for " +
+s"${System.currentTimeMillis() - start} due to task queue 
being full."
+)
+  }
+} catch {
+  case e: InterruptedException =>
+Thread.currentThread.interrupt()
+throw new RejectedExecutionException("Producer interrupted", e)
+  case e: Throwable =>
+logError("Encountered error in async write executor service", e)
+errorNotifier.markError(e)
+}
+  }
+})
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+sparkSession,
+checkpointFile("offsets"),
+asyncWritesExecutorService,
+asyncProgressTrackingCheckpointingIntervalMs,
+clock = triggerClock
+  )
+
+  override val commitLog =
+new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+// check if pipeline is stateful
+checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+// this is a no op for async progress tracking since we only want to 
commit sources only
+// after the offset 

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049243767


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+  extraOptions: Map[String, String]): Long = {
+extraOptions
+  .getOrElse(
+
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+"1000"
+  )
+  .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+sparkSession: SparkSession,
+trigger: Trigger,
+triggerClock: Clock,
+extraOptions: Map[String, String],
+plan: WriteToStream)
+extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+"async-log-write",
+2, // one for offset commit and one for completion commit
+new RejectedExecutionHandler() {
+  override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+try {
+  if (!executor.isShutdown) {
+val start = System.currentTimeMillis()
+executor.getQueue.put(r)
+logDebug(
+  s"Async write paused execution for " +
+s"${System.currentTimeMillis() - start} due to task queue 
being full."
+)
+  }
+} catch {
+  case e: InterruptedException =>
+Thread.currentThread.interrupt()
+throw new RejectedExecutionException("Producer interrupted", e)
+  case e: Throwable =>
+logError("Encountered error in async write executor service", e)
+errorNotifier.markError(e)
+}
+  }
+})
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+sparkSession,
+checkpointFile("offsets"),
+asyncWritesExecutorService,
+asyncProgressTrackingCheckpointingIntervalMs,
+clock = triggerClock
+  )
+
+  override val commitLog =
+new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+// check if pipeline is stateful
+checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+// this is a no op for async progress tracking since we only want to 
commit sources only
+// after the offset 

[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1049238906


##
python/pyspark/sql/connect/functions.py:
##
@@ -79,6 +85,84 @@ def _invoke_binary_math_function(name: str, col1: Any, col2: 
Any) -> Column:
 return _invoke_function(name, *_cols)
 
 
+def _get_lambda_parameters(f: Callable) -> ValuesView[inspect.Parameter]:
+signature = inspect.signature(f)
+parameters = signature.parameters.values()
+
+# We should exclude functions that use
+# variable args and keyword argnames
+# as well as keyword only args
+supported_parameter_types = {
+inspect.Parameter.POSITIONAL_OR_KEYWORD,
+inspect.Parameter.POSITIONAL_ONLY,
+}
+
+# Validate that
+# function arity is between 1 and 3
+if not (1 <= len(parameters) <= 3):
+raise ValueError(
+"f should take between 1 and 3 arguments, but provided function 
takes {}".format(
+len(parameters)
+)
+)
+
+# and all arguments can be used as positional
+if not all(p.kind in supported_parameter_types for p in parameters):
+raise ValueError("f should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments")
+
+return parameters
+
+
+def _create_lambda(f: Callable) -> LambdaFunction:
+"""
+Create `o.a.s.sql.expressions.LambdaFunction` corresponding
+to transformation described by f
+
+:param f: A Python of one of the following forms:
+- (Column) -> Column: ...
+- (Column, Column) -> Column: ...
+- (Column, Column, Column) -> Column: ...
+"""
+parameters = _get_lambda_parameters(f)
+
+arg_names = ["x", "y", "z"]
+
+arg_cols: List[Column] = []
+for arg in arg_names[: len(parameters)]:
+# TODO: How to make sure lambda variable names are unique? RPC for 
increasing ID?

Review Comment:
   PySpark invokes `UnresolvedNamedLambdaVariable.freshVarName` in JVM to get a 
unique variable name
   

https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L8157
   
   ```
   object UnresolvedNamedLambdaVariable {
   
 // Counter to ensure lambda variable names are unique
 private val nextVarNameId = new AtomicInteger(0)
   
 def freshVarName(name: String): String = {
   s"${name}_${nextVarNameId.getAndIncrement()}"
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng commented on PR #39068:
URL: https://github.com/apache/spark/pull/39068#issuecomment-1352588465

   reviewers can refer to the implementation in PySpark
   
   
https://github.com/apache/spark/blob/master/python/pyspark/sql/functions.py#L8094-L8197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-14 Thread GitBox


zhengruifeng opened a new pull request, #39068:
URL: https://github.com/apache/spark/pull/39068

   ### What changes were proposed in this pull request?
   There are 11 lambda functions in higher order functions, this PR add the 
basic support for `LambdaFunction` and add `array_exists` function.
   
   ### Why are the changes needed?
   for API coverage
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes, new API
   
   
   ### How was this patch tested?
   added UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade `kubernetes-client` to 6.3.0

2022-12-14 Thread GitBox


dongjoon-hyun commented on PR #39065:
URL: https://github.com/apache/spark/pull/39065#issuecomment-1352587300

   Also, cc @Yikun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade `kubernetes-client` to 6.3.0

2022-12-14 Thread GitBox


dongjoon-hyun closed pull request #39065: [SPARK-41521][BUILD][K8S] Upgrade 
`kubernetes-client` to 6.3.0
URL: https://github.com/apache/spark/pull/39065


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #39067: [DON'T MERGE] Test test-dependencies

2022-12-14 Thread GitBox


dongjoon-hyun commented on PR #39067:
URL: https://github.com/apache/spark/pull/39067#issuecomment-1352584283

   Thank you for investigating this, @LuciferYang .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #39067: [DON'T MERGE] Test test-dependencies

2022-12-14 Thread GitBox


LuciferYang opened a new pull request, #39067:
URL: https://github.com/apache/spark/pull/39067

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #39066: [SPARK-41523][PROTBUF][BUILD] Change `protoc-jar-maven-plugin` definition in `user-defined-protoc` profile use `protoc-jar-maven-plugin.

2022-12-14 Thread GitBox


LuciferYang opened a new pull request, #39066:
URL: https://github.com/apache/spark/pull/39066

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38864: [SPARK-41271][SQL] Support parameterized SQL queries by `sql()`

2022-12-14 Thread GitBox


cloud-fan commented on code in PR #38864:
URL: https://github.com/apache/spark/pull/38864#discussion_r1049204162


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/parameters.scala:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PARAMETER, TreePattern}
+import org.apache.spark.sql.errors.QueryErrorsBase
+import org.apache.spark.sql.types.DataType
+
+/**
+ * The expression represents a named parameter that should be replaces by a 
literal.

Review Comment:
   ```suggestion
* The expression represents a named parameter that should be replaced by a 
literal.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MrDLontheway commented on a diff in pull request #38893: [Spark-40099][SQL] Merge adjacent CaseWhen branches if their values are the same

2022-12-14 Thread GitBox


MrDLontheway commented on code in PR #38893:
URL: https://github.com/apache/spark/pull/38893#discussion_r1049199875


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -639,6 +639,26 @@ object RemoveNoopOperators extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Merge case condition expression with same value.
+ */
+object MergeConditionWithValue extends Rule[LogicalPlan] {

Review Comment:
   @wangyum  have any update ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #39057: [SPARK-41513][SQL] Implement an accumulator to collect per mapper row count metrics

2022-12-14 Thread GitBox


cloud-fan commented on code in PR #39057:
URL: https://github.com/apache/spark/pull/39057#discussion_r1049199171


##
core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala:
##
@@ -513,3 +513,81 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
 getOrCreate.addAll(newValue)
   }
 }
+
+
+/**
+ * An [[AccumulatorV2 counter]] for collecting a list of (mapper id, row 
count).
+ *
+ * @since 3.4.0
+ */
+class MapperRowCounter extends AccumulatorV2[jl.Long, 
java.util.List[java.util.List[jl.Long]]] {
+
+  private var _agg: java.util.List[java.util.List[jl.Long]] = _

Review Comment:
   a list of tuple2[int, int]?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #38888: [SPARK-41405][SQL] Centralize the column resolution logic

2022-12-14 Thread GitBox


cloud-fan commented on code in PR #3:
URL: https://github.com/apache/spark/pull/3#discussion_r1049198193


##
sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out:
##
@@ -149,9 +149,9 @@ org.apache.spark.sql.AnalysisException
   "queryContext" : [ {

Review Comment:
   The error class
   ```
 "_LEGACY_ERROR_TEMP_2422" : {
   "message" : [
 "grouping expressions sequence is empty, and '' is not an 
aggregate function. Wrap '' in windowing function(s) or wrap 
'' in first() (or first_value) if you don't care which value you get."
   ]
 },
   ```
   
   The query context becomes more accurate actually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SandishKumarHN commented on a diff in pull request #39039: [SPARK-40776][SQL][PROTOBUF][DOCS] Spark-Protobuf docs

2022-12-14 Thread GitBox


SandishKumarHN commented on code in PR #39039:
URL: https://github.com/apache/spark/pull/39039#discussion_r1049183578


##
docs/sql-data-sources-protobuf.md:
##
@@ -0,0 +1,301 @@
+---

Review Comment:
   @rangadi it appears to be the same for spark-avro as well, 
https://github.com/apache/spark/blob/master/docs/sql-data-sources-avro.md#to_avro-and-from_avro.
 It will be rendered as a webpage rather than a readme file. spark-avro 
documentation can be found at 
https://spark.apache.org/docs/latest/sql-data-sources-avro.html. not sure how 
to test 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] SandishKumarHN commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-14 Thread GitBox


SandishKumarHN commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049175487


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -92,14 +106,26 @@ object SchemaConverters {
 MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
 nullable = false))
   case MESSAGE =>
-if (existingRecordNames.contains(fd.getFullName)) {
+// Setting the circularReferenceDepth to 0 allows the field to be 
recursed once, setting
+// it to 1 allows it to be recursed twice, and setting it to 2 allows 
it to be recursed
+// thrice. circularReferenceDepth value greater than 2 is not allowed. 
If the not
+// specified, it will default to -1, which disables recursive fields.
+val recordName = fd.getMessageType.getFullName
+if (existingRecordNames.contains(recordName) &&

Review Comment:
   @rangadi thanks for the review, I have made all changes you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #39065: [SPARK-41521][BUILD] Upgrade `kubernetes-client` to 6.3.0

2022-12-14 Thread GitBox


dongjoon-hyun commented on PR #39065:
URL: https://github.com/apache/spark/pull/39065#issuecomment-1352481238

   BTW, there exist breaking changes. Does this pass in your GitHub Action?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-14 Thread GitBox


rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -92,14 +106,26 @@ object SchemaConverters {
 MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
 nullable = false))
   case MESSAGE =>
-if (existingRecordNames.contains(fd.getFullName)) {
+// Setting the circularReferenceDepth to 0 allows the field to be 
recursed once, setting
+// it to 1 allows it to be recursed twice, and setting it to 2 allows 
it to be recursed
+// thrice. circularReferenceDepth value greater than 2 is not allowed. 
If the not
+// specified, it will default to -1, which disables recursive fields.
+val recordName = fd.getMessageType.getFullName
+if (existingRecordNames.contains(recordName) &&

Review Comment:
   Scratch the above suggestion. 
   Instead you could add 'else' to what you have and remove 'return'. That is 
simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-14 Thread GitBox


rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049157233


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -92,14 +106,26 @@ object SchemaConverters {
 MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
 nullable = false))
   case MESSAGE =>
-if (existingRecordNames.contains(fd.getFullName)) {
+// Setting the circularReferenceDepth to 0 allows the field to be 
recursed once, setting
+// it to 1 allows it to be recursed twice, and setting it to 2 allows 
it to be recursed
+// thrice. circularReferenceDepth value greater than 2 is not allowed. 
If the not
+// specified, it will default to -1, which disables recursive fields.
+val recordName = fd.getMessageType.getFullName
+if (existingRecordNames.contains(recordName) &&

Review Comment:
   Scratch the above suggestion. 
   Instead you could add 'else' what you have and remove 'return'. That is 
simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dengziming commented on pull request #39059: [SPARK-41506][CONNECT][MINOR]: small fix in python code

2022-12-14 Thread GitBox


dengziming commented on PR #39059:
URL: https://github.com/apache/spark/pull/39059#issuecomment-1352477957

   Thank you @zhengruifeng @srowen , The response time of Spark project is so 
short, awesome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`

2022-12-14 Thread GitBox


beliefer commented on PR #39035:
URL: https://github.com/apache/spark/pull/39035#issuecomment-1352465823

   @zhengruifeng @amaliujia Thank you for all !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun opened a new pull request, #39065: [SPARK-41521][BUILD] Upgrade fabric8io - kubernetes-client from 6.2.0 to 6.3.0

2022-12-14 Thread GitBox


panbingkun opened a new pull request, #39065:
URL: https://github.com/apache/spark/pull/39065

   ### What changes were proposed in this pull request?
   
   
   ### Why are the changes needed?
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Pass GA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39048: [SPARK-41424][UI] Protobuf serializer for TaskDataWrapper

2022-12-14 Thread GitBox


gengliangwang commented on code in PR #39048:
URL: https://github.com/apache/spark/pull/39048#discussion_r1049145069


##
core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.status.TaskDataWrapper
+import org.apache.spark.status.api.v1.AccumulableInfo
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+object TaskDataWrapperSerializer {
+  def serialize(input: TaskDataWrapper): Array[Byte] = {
+val builder = StoreTypes.TaskDataWrapper.newBuilder()
+  .setTaskId(input.taskId)
+  .setIndex(input.index)
+  .setAttempt(input.attempt)
+  .setPartitionId(input.partitionId)
+  .setLaunchTime(input.launchTime)
+  .setResultFetchStart(input.resultFetchStart)
+  .setDuration(input.duration)
+  .setExecutorId(input.executorId)
+  .setHost(input.host)
+  .setStatus(input.status)
+  .setTaskLocality(input.taskLocality)
+  .setSpeculative(input.speculative)
+  .setHasMetrics(input.hasMetrics)
+  .setExecutorDeserializeTime(input.executorDeserializeTime)
+  .setExecutorDeserializeCpuTime(input.executorDeserializeCpuTime)
+  .setExecutorRunTime(input.executorRunTime)
+  .setExecutorCpuTime(input.executorCpuTime)
+  .setResultSize(input.resultSize)
+  .setJvmGcTime(input.jvmGcTime)
+  .setResultSerializationTime(input.resultSerializationTime)
+  .setMemoryBytesSpilled(input.memoryBytesSpilled)
+  .setDiskBytesSpilled(input.diskBytesSpilled)
+  .setPeakExecutionMemory(input.peakExecutionMemory)
+  .setInputBytesRead(input.inputBytesRead)
+  .setInputRecordsRead(input.inputRecordsRead)
+  .setOutputBytesWritten(input.outputBytesWritten)
+  .setOutputRecordsWritten(input.outputRecordsWritten)
+  .setShuffleRemoteBlocksFetched(input.shuffleRemoteBlocksFetched)
+  .setShuffleLocalBlocksFetched(input.shuffleLocalBlocksFetched)
+  .setShuffleFetchWaitTime(input.shuffleFetchWaitTime)
+  .setShuffleRemoteBytesRead(input.shuffleRemoteBytesRead)
+  .setShuffleRemoteBytesReadToDisk(input.shuffleRemoteBytesReadToDisk)
+  .setShuffleLocalBytesRead(input.shuffleLocalBytesRead)
+  .setShuffleRecordsRead(input.shuffleRecordsRead)
+  .setShuffleBytesWritten(input.shuffleBytesWritten)
+  .setShuffleWriteTime(input.shuffleWriteTime)
+  .setShuffleRecordsWritten(input.shuffleRecordsWritten)
+  .setStageId(input.stageId)
+  .setStageAttemptId(input.stageAttemptId)
+input.errorMessage.foreach(builder.setErrorMessage)
+input.accumulatorUpdates.foreach { update =>
+  builder.addAccumulatorUpdates(serializeAccumulableInfo(update))
+}
+builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): TaskDataWrapper = {
+val binary = StoreTypes.TaskDataWrapper.parseFrom(bytes)
+val accumulatorUpdates = new ArrayBuffer[AccumulableInfo]()
+binary.getAccumulatorUpdatesList.forEach { update =>
+  accumulatorUpdates.append(new AccumulableInfo(
+id = update.getId,
+name = update.getName,
+update = getOptional(update.hasUpdate, update.getUpdate),
+value = update.getValue))
+}
+new TaskDataWrapper(
+  taskId = binary.getTaskId,
+  index = binary.getIndex,
+  attempt = binary.getAttempt,
+  partitionId = binary.getPartitionId,
+  launchTime = binary.getLaunchTime,
+  resultFetchStart = binary.getResultFetchStart,
+  duration = binary.getDuration,
+  executorId = binary.getExecutorId,
+  host = binary.getHost,
+  status = binary.getStatus,
+  taskLocality = binary.getTaskLocality,
+  speculative = binary.getSpeculative,
+  accumulatorUpdates = accumulatorUpdates.toSeq,
+  errorMessage = getOptional(binary.hasErrorMessage, 
binary.getErrorMessage),
+  hasMetrics = binary.getHasMetrics,
+  executorDeserializeTime = binary.getExecutorDeserializeTime,
+  executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime,
+  executorRunTime = binary.getExe

[GitHub] [spark] kelvinjian-db opened a new pull request, #39064: [SPARK-41520] Split AND_OR TreePattern to separate AND and OR TreePatterns

2022-12-14 Thread GitBox


kelvinjian-db opened a new pull request, #39064:
URL: https://github.com/apache/spark/pull/39064

   
   
   ### What changes were proposed in this pull request?
   
   
   Removed `AND_OR` TreePattern, replaced with separate `AND` TreePattern and 
`OR` TreePattern.
   
   ### Why are the changes needed?
   
   
   This way we can be more fine-grained with how we match for AND/OR patterns, 
as many times we want to treat them separately.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No.
   
   ### How was this patch tested?
   
   
   No tests were added as there are not any existing unit tests for 
TreePatterns. Instead, I just made sure the existing relevant tests (e.g. 
`BooleanSimplificationSuite`) passed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38956: [SPARK-41319][CONNECT][PYTHON] Implement Column.{when, otherwise} and Function `when` with `UnresolvedFunction`

2022-12-14 Thread GitBox


zhengruifeng commented on PR #38956:
URL: https://github.com/apache/spark/pull/38956#issuecomment-1352461291

   cc @HyukjinKwon @cloud-fan @amaliujia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`

2022-12-14 Thread GitBox


zhengruifeng commented on PR #39035:
URL: https://github.com/apache/spark/pull/39035#issuecomment-1352460909

   merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #39035: [SPARK-41438][CONNECT][PYTHON] Implement `DataFrame.colRegex`

2022-12-14 Thread GitBox


zhengruifeng closed pull request #39035: [SPARK-41438][CONNECT][PYTHON] 
Implement `DataFrame.colRegex`
URL: https://github.com/apache/spark/pull/39035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-14 Thread GitBox


rangadi commented on code in PR #38922:
URL: https://github.com/apache/spark/pull/38922#discussion_r1049126074


##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
 parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed 
once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to 
be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If 
the not
+  // specified, it will default to -1, which disables recursive fields.

Review Comment:
   '-1' implies recursive fields are not allowed. 
   ("disables" does not clearly imply that it will be an error")



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
 parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed 
once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to 
be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If 
the not
+  // specified, it will default to -1, which disables recursive fields.
+  val circularReferenceDepth: Int = 
parameters.getOrElse("circularReferenceDepth", "-1").toInt

Review Comment:
   Suggestion for renaming this option:  _"recursive.fields.max.depth"_ 
   
   _circularReferenceDepth_ sounds very code variable type. 
   



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -92,14 +106,26 @@ object SchemaConverters {
 MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
 nullable = false))
   case MESSAGE =>
-if (existingRecordNames.contains(fd.getFullName)) {
+// Setting the circularReferenceDepth to 0 allows the field to be 
recursed once, setting
+// it to 1 allows it to be recursed twice, and setting it to 2 allows 
it to be recursed
+// thrice. circularReferenceDepth value greater than 2 is not allowed. 
If the not
+// specified, it will default to -1, which disables recursive fields.
+val recordName = fd.getMessageType.getFullName
+if (existingRecordNames.contains(recordName) &&
+  protobufOptions.circularReferenceDepth < 0 ) {
   throw 
QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
+} else if (existingRecordNames.contains(recordName) &&
+  existingRecordNames.getOrElse(recordName, 0)
+> protobufOptions.circularReferenceDepth) {
+  return Some(StructField(fd.getName, NullType, nullable = false))

Review Comment:
   Why is nullable false? 



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala:
##
@@ -38,6 +38,12 @@ private[sql] class ProtobufOptions(
 
   val parseMode: ParseMode =
 parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+
+  // Setting the circularReferenceDepth to 0 allows the field to be recursed 
once, setting
+  // it to 1 allows it to be recursed twice, and setting it to 2 allows it to 
be recursed
+  // thrice. circularReferenceDepth value greater than 2 is not allowed. If 
the not
+  // specified, it will default to -1, which disables recursive fields.

Review Comment:
   Also warn that if the the protobuf record has more depth for recursive 
fields than allowed here, it will be truncated to the allowed depth. The 
implies some fields are discarded from the record. 
   
   Could you add a simple example in the comment showing resulting spark schema 
when this is set to '0' and '2'. 



##
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala:
##
@@ -92,14 +106,26 @@ object SchemaConverters {
 MapType(keyType, valueType, valueContainsNull = 
false).defaultConcreteType,
 nullable = false))
   case MESSAGE =>
-if (existingRecordNames.contains(fd.getFullName)) {
+// Setting the circularReferenceDepth to 0 allows the field to be 
recursed once, setting
+// it to 1 allows it to be recursed twice, and setting it to 2 allows 
it to be recursed
+// thrice. circularReferenceDepth value greater than 2 is not allowed. 
If the not
+// specified, it will default to -1, which disables recursive fields.
+val recordName = fd.getMessageType.getFullName
+if (existingRecordNames.contains(recordName) &&

Review Comment:
   Better to remove 'return' statement. 
   How about"
   
   val recursiveDepth = existingRecordNames.ge

[GitHub] [spark] allisonport-db commented on pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

2022-12-14 Thread GitBox


allisonport-db commented on PR #38823:
URL: https://github.com/apache/spark/pull/38823#issuecomment-1352455011

   > Does the SQL standard say anything about the restrictions of the generate 
expression? Can we allow `GENERATED AS rand()`? I think at least catalyst 
should have a rule to check the expression and make sure the data type is the 
same as column type.
   
   Checking for data type make sense.
   
   Other restrictions we can enforce
   - no UDFs
   - only deterministic functions
   - no subqueries
   - no window functions
   - no aggregate functions
   - no generator functions
   - interdependence with other generated columns


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 true
   }
 
+  /**
+   * Test async progress tracking capability with Kafka source and sink
+   */
+  test("async progress tracking") {
+val inputTopic = newTopic()
+testUtils.createTopic(inputTopic, partitions = 5)
+
+val dataSent = new ListBuffer[String]()
+testUtils.sendMessages(inputTopic, (0 until 15).map { case x =>
+  val m = s"foo-$x"
+  dataSent += m
+  m
+}.toArray, Some(0))
+
+val outputTopic = newTopic()
+testUtils.createTopic(outputTopic, partitions = 5)
+
+withTempDir { dir =>
+  val reader = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+.option("kafka.metadata.max.age.ms", "1")
+.option("maxOffsetsPerTrigger", 5)
+.option("subscribe", inputTopic)
+.option("startingOffsets", "earliest")
+.load()
+
+  def startQuery(): StreamingQuery = {
+reader.writeStream
+  .format("kafka")
+  .option("checkpointLocation", dir.getCanonicalPath)
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.max.block.ms", "5000")
+  .option("topic", outputTopic)
+  .option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+  .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000)
+  .queryName("kafkaStream")
+  .start()
+  }
+
+  def readResults(): ListBuffer[String] = {

Review Comment:
   The output would be same but the code and actual execution would be much 
simpler in batch query. See below code when we just go with batch query:
   
   ```
   spark.read
 .format("kafka")
 .option("kafka.bootstrap.servers", testUtils.brokerAddress)
 .option("startingOffsets", "earliest")
 .option("subscribe", outputTopic)
 .load()
 .select("CAST(value AS string)")
 .toDS()
 .collect()
 .map(_._1)
   ```
   
   The entire code in the method can be replaced with this query. Haven't gave 
a try but the actual code that could execute won't be much different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 true
   }
 
+  /**
+   * Test async progress tracking capability with Kafka source and sink
+   */
+  test("async progress tracking") {
+val inputTopic = newTopic()
+testUtils.createTopic(inputTopic, partitions = 5)
+
+val dataSent = new ListBuffer[String]()
+testUtils.sendMessages(inputTopic, (0 until 15).map { case x =>
+  val m = s"foo-$x"
+  dataSent += m
+  m
+}.toArray, Some(0))
+
+val outputTopic = newTopic()
+testUtils.createTopic(outputTopic, partitions = 5)
+
+withTempDir { dir =>
+  val reader = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+.option("kafka.metadata.max.age.ms", "1")
+.option("maxOffsetsPerTrigger", 5)
+.option("subscribe", inputTopic)
+.option("startingOffsets", "earliest")
+.load()
+
+  def startQuery(): StreamingQuery = {
+reader.writeStream
+  .format("kafka")
+  .option("checkpointLocation", dir.getCanonicalPath)
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.max.block.ms", "5000")
+  .option("topic", outputTopic)
+  .option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+  .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000)
+  .queryName("kafkaStream")
+  .start()
+  }
+
+  def readResults(): ListBuffer[String] = {

Review Comment:
   The output would be same but the code and actual execution would be much 
simpler in batch query. See below code when we just go with batch query:
   
   ```
   spark.read
 .format("kafka")
 .option("kafka.bootstrap.servers", testUtils.brokerAddress)
 .option("startingOffsets", "earliest")
 .option("subscribe", outputTopic)
 .load()
 .select("CAST(value AS string)")
 .toDS()
 .collect()
 .map(_._1)
   ```
   
   The entire code in the method can be replaced with this query. Haven't gave 
a try but the actual code that could execute won't be much different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #39039: [SPARK-40776][SQL][PROTOBUF][DOCS] Spark-Protobuf docs

2022-12-14 Thread GitBox


rangadi commented on code in PR #39039:
URL: https://github.com/apache/spark/pull/39039#discussion_r1049120692


##
docs/sql-data-sources-protobuf.md:
##
@@ -0,0 +1,301 @@
+---

Review Comment:
   Could you check the formatting. It seems to require fixes. 
   Sample from github view of this page below. It it not rendered correctly.
   https://user-images.githubusercontent.com/502522/207749158-8990d41c-4d60-411d-913f-232b432df2de.png";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049119941


##
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##
@@ -195,6 +200,102 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 true
   }
 
+  /**
+   * Test async progress tracking capability with Kafka source and sink
+   */
+  test("async progress tracking") {
+val inputTopic = newTopic()
+testUtils.createTopic(inputTopic, partitions = 5)
+
+val dataSent = new ListBuffer[String]()
+testUtils.sendMessages(inputTopic, (0 until 15).map { case x =>
+  val m = s"foo-$x"
+  dataSent += m
+  m
+}.toArray, Some(0))
+
+val outputTopic = newTopic()
+testUtils.createTopic(outputTopic, partitions = 5)
+
+withTempDir { dir =>
+  val reader = spark
+.readStream
+.format("kafka")
+.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+.option("kafka.metadata.max.age.ms", "1")
+.option("maxOffsetsPerTrigger", 5)
+.option("subscribe", inputTopic)
+.option("startingOffsets", "earliest")
+.load()
+
+  def startQuery(): StreamingQuery = {
+reader.writeStream
+  .format("kafka")
+  .option("checkpointLocation", dir.getCanonicalPath)
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.max.block.ms", "5000")
+  .option("topic", outputTopic)
+  .option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+  .option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 1000)
+  .queryName("kafkaStream")
+  .start()
+  }
+
+  def readResults(): ListBuffer[String] = {

Review Comment:
   The output would be same but the code and actual execution would be much 
simpler in batch query. See below code when we just go with batch query:
   
   ```
   val data = spark.read.format("kafka")...load().select("CAST(value AS 
string)").toDS().collect().map(_._1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

2022-12-14 Thread GitBox


allisonport-db commented on code in PR #38823:
URL: https://github.com/apache/spark/pull/38823#discussion_r1049104281


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java:
##
@@ -93,4 +93,18 @@ default Transform[] 
inferPartitioning(CaseInsensitiveStringMap options) {
   default boolean supportsExternalMetadata() {
 return false;
   }
+
+  /**
+   * Returns true if the source supports defining generated columns upon table 
creation in SQL.
+   * When false: any create/replace table statements with a generated column 
defined in the table
+   * schema will throw an exception during analysis.
+   *
+   * A generated column is defined with syntax: {@code colName colType 
GENERATED ALWAYS AS (expr)}
+   * The generation expression is stored in the column metadata with key 
"generationExpression".
+   *
+   * Override this method to allow defining generated columns in 
create/replace table statements.
+   */
+  default boolean supportsGeneratedColumnsOnCreation() {

Review Comment:
   @sunchao `TableCapability` has the same issue described above as including 
it in `Table`
   @RussellSpitzer I think it would be up to the implementing data source to 
restrict what's supported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049102400


##
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala:
##
@@ -167,6 +167,27 @@ private[spark] object ThreadUtils {
 Executors.newFixedThreadPool(1, 
threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
+  /**
+   * Wrapper over newSingleThreadExecutor that allows the specification
+   * of a RejectedExecutionHandler
+   */
+  def newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+threadName: String,

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

2022-12-14 Thread GitBox


allisonport-db commented on code in PR #38823:
URL: https://github.com/apache/spark/pull/38823#discussion_r1049102169


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala:
##
@@ -171,6 +171,12 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
   val (storageFormat, provider) = getStorageFormatAndProvider(
 c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, 
c.tableSpec.serde,
 ctas = false)
+
+  if (GeneratedColumn.hasGeneratedColumns(c.tableSchema) &&

Review Comment:
   Is there a reason why we match `ResolvedV1Identifier` here instead of 
`ResolvedV1TableIdentifier`?
   
   I think we would want to add the API to `TableCatalog` and not 
`CatalogPlugin` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonport-db commented on a diff in pull request #38823: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

2022-12-14 Thread GitBox


allisonport-db commented on code in PR #38823:
URL: https://github.com/apache/spark/pull/38823#discussion_r1049100728


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableProvider.java:
##
@@ -93,4 +93,18 @@ default Transform[] 
inferPartitioning(CaseInsensitiveStringMap options) {
   default boolean supportsExternalMetadata() {
 return false;
   }
+
+  /**
+   * Returns true if the source supports defining generated columns upon table 
creation in SQL.
+   * When false: any create/replace table statements with a generated column 
defined in the table
+   * schema will throw an exception during analysis.
+   *
+   * A generated column is defined with syntax: {@code colName colType 
GENERATED ALWAYS AS (expr)}
+   * The generation expression is stored in the column metadata with key 
"generationExpression".
+   *
+   * Override this method to allow defining generated columns in 
create/replace table statements.
+   */
+  default boolean supportsGeneratedColumnsOnCreation() {

Review Comment:
   Couldn't this be an issue though if we add the syntax to another statement 
(for example alter table add column) and the catalog/data source (whatever we 
land on) hasn't updated their implementation yet to accomodate for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049023076


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049099509


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049096226


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049095786


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] github-actions[bot] commented on pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

2022-12-14 Thread GitBox


github-actions[bot] commented on PR #37711:
URL: https://github.com/apache/spark/pull/37711#issuecomment-1352393959

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049095149


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049094888


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049093714


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049093235


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049092707


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] HyukjinKwon closed pull request #39056: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept `tuple` and `dict`

2022-12-14 Thread GitBox


HyukjinKwon closed pull request #39056: 
[SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept 
`tuple` and `dict`
URL: https://github.com/apache/spark/pull/39056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39056: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Function `lit` should not accept `tuple` and `dict`

2022-12-14 Thread GitBox


HyukjinKwon commented on PR #39056:
URL: https://github.com/apache/spark/pull/39056#issuecomment-1352388686

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #39060: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null

2022-12-14 Thread GitBox


HyukjinKwon closed pull request #39060: 
[SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null
URL: https://github.com/apache/spark/pull/39060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39060: [SPARK-41506][CONNECT][PYTHON][FOLLOWUP] Support typed null

2022-12-14 Thread GitBox


HyukjinKwon commented on PR #39060:
URL: https://github.com/apache/spark/pull/39060#issuecomment-1352384703

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049083624


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+  extraOptions: Map[String, String]): Long = {
+extraOptions
+  .getOrElse(
+
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+"1000"
+  )
+  .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+sparkSession: SparkSession,
+trigger: Trigger,
+triggerClock: Clock,
+extraOptions: Map[String, String],
+plan: WriteToStream)
+extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+"async-log-write",
+2, // one for offset commit and one for completion commit
+new RejectedExecutionHandler() {
+  override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+try {
+  if (!executor.isShutdown) {
+val start = System.currentTimeMillis()
+executor.getQueue.put(r)
+logDebug(
+  s"Async write paused execution for " +
+s"${System.currentTimeMillis() - start} due to task queue 
being full."
+)
+  }
+} catch {
+  case e: InterruptedException =>
+Thread.currentThread.interrupt()
+throw new RejectedExecutionException("Producer interrupted", e)
+  case e: Throwable =>
+logError("Encountered error in async write executor service", e)
+errorNotifier.markError(e)
+}
+  }
+})
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+sparkSession,
+checkpointFile("offsets"),
+asyncWritesExecutorService,
+asyncProgressTrackingCheckpointingIntervalMs,
+clock = triggerClock
+  )
+
+  override val commitLog =
+new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+// check if pipeline is stateful
+checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+// this is a no op for async progress tracking since we only want to 
commit sources only
+// after the offse

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049083624


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+  extraOptions: Map[String, String]): Long = {
+extraOptions
+  .getOrElse(
+
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+"1000"
+  )
+  .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+sparkSession: SparkSession,
+trigger: Trigger,
+triggerClock: Clock,
+extraOptions: Map[String, String],
+plan: WriteToStream)
+extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+"async-log-write",
+2, // one for offset commit and one for completion commit
+new RejectedExecutionHandler() {
+  override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+try {
+  if (!executor.isShutdown) {
+val start = System.currentTimeMillis()
+executor.getQueue.put(r)
+logDebug(
+  s"Async write paused execution for " +
+s"${System.currentTimeMillis() - start} due to task queue 
being full."
+)
+  }
+} catch {
+  case e: InterruptedException =>
+Thread.currentThread.interrupt()
+throw new RejectedExecutionException("Producer interrupted", e)
+  case e: Throwable =>
+logError("Encountered error in async write executor service", e)
+errorNotifier.markError(e)
+}
+  }
+})
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+sparkSession,
+checkpointFile("offsets"),
+asyncWritesExecutorService,
+asyncProgressTrackingCheckpointingIntervalMs,
+clock = triggerClock
+  )
+
+  override val commitLog =
+new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+// check if pipeline is stateful
+checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+// this is a no op for async progress tracking since we only want to 
commit sources only
+// after the offse

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049079507


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049079011


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049077958


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049076380


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049075731


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049075469


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049074121


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049073555


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049071928


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049071823


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049071577


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049070974


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049070725


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049070590


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049070447


##
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala:
##
@@ -167,6 +167,27 @@ private[spark] object ThreadUtils {
 Executors.newFixedThreadPool(1, 
threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
+  /**
+   * Wrapper over newSingleThreadExecutor that allows the specification
+   * of a RejectedExecutionHandler
+   */
+  def newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+threadName: String,

Review Comment:
   Not really. Please note that params in method call and params in method 
definition have different indentation. 2 spaces for former, 4 spaces for latter.
   https://github.com/databricks/scala-style-guide#indent
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049069404


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049032417


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-14 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049028853


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+

  1   2   3   >