Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on PR #45744:
URL: https://github.com/apache/spark/pull/45744#issuecomment-2024460329

   @cloud-fan 
   I'm very sorry, I broke the branch corresponding to the previously reviewed 
PR. 
   Can I use this new PR to submit this feature?
   
   The reviewed PR: https://github.com/apache/spark/pull/45714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Refine docstring of `try_sum`, `try_avg`, `avg`, `sum`, `mean` [spark]

2024-03-27 Thread via GitHub


HyukjinKwon opened a new pull request, #45745:
URL: https://github.com/apache/spark/pull/45745

   ### What changes were proposed in this pull request?
   
   This PR refines docstring of  `try_sum`, `try_avg`, `avg`, `sum`, `mean`  
with more descriptive examples.
   
   ### Why are the changes needed?
   
   For better API reference documentation.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, it fixes user-facing documentation.
   
   ### How was this patch tested?
   
   Manually tested. GitHub Actions should verify them.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Parse json code generator new [spark]

2024-03-27 Thread via GitHub


panbingkun opened a new pull request, #45744:
URL: https://github.com/apache/spark/pull/45744

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

2024-03-27 Thread via GitHub


HeartSaVioR commented on PR #45467:
URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453733

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

2024-03-27 Thread via GitHub


HeartSaVioR closed pull request #45467: [SPARK-47363][SS] Initial State without 
state reader implementation for State API v2.
URL: https://github.com/apache/spark/pull/45467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

2024-03-27 Thread via GitHub


HeartSaVioR commented on PR #45467:
URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453641

   CI failure isn't related - only pyspark-connect failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Test scala-maven-plugin 4.9.1 [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on code in PR #45716:
URL: https://github.com/apache/spark/pull/45716#discussion_r1542333424


##
core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:
##
@@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging {
   }
 
   private[serializer] var enableDebugging: Boolean = {
-!AccessController.doPrivileged(new sun.security.action.GetBooleanAction(
-  "sun.io.serialization.extendedDebugInfo")).booleanValue()
+!sun.security.action.GetBooleanAction
+  .privilegedGetProperty("sun.io.serialization.extendedDebugInfo")

Review Comment:
   try fix
   
   ```
   
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71:
 object security is not a member of package sun
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Test scala-maven-plugin 4.9.1 [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on code in PR #45716:
URL: https://github.com/apache/spark/pull/45716#discussion_r1542333284


##
core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:
##
@@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging {
   }
 
   private[serializer] var enableDebugging: Boolean = {
-!AccessController.doPrivileged(new sun.security.action.GetBooleanAction(
-  "sun.io.serialization.extendedDebugInfo")).booleanValue()
+!sun.security.action.GetBooleanAction

Review Comment:
   try fix 
   
   ```
   
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71:
 object security is not a member of package sun
   ```



##
core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:
##
@@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging {
   }
 
   private[serializer] var enableDebugging: Boolean = {
-!AccessController.doPrivileged(new sun.security.action.GetBooleanAction(
-  "sun.io.serialization.extendedDebugInfo")).booleanValue()
+!sun.security.action.GetBooleanAction

Review Comment:
   try fix 
   
   ```
   
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71:
 object security is not a member of package sun
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Test scala-maven-plugin 4.9.1 [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on code in PR #45716:
URL: https://github.com/apache/spark/pull/45716#discussion_r1542333424


##
core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:
##
@@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging {
   }
 
   private[serializer] var enableDebugging: Boolean = {
-!AccessController.doPrivileged(new sun.security.action.GetBooleanAction(
-  "sun.io.serialization.extendedDebugInfo")).booleanValue()
+!sun.security.action.GetBooleanAction
+  .privilegedGetProperty("sun.io.serialization.extendedDebugInfo")

Review Comment:
   try fix
   
   ```
   
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71:
 object security is not a member of package sun
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542312888


##
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant;
+
+import scala.util.control.NonFatal;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.BadRecordException;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantUtil;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.apache.spark.types.variant.VariantSizeLimitException;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.types.VariantVal;
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+public class VariantExpressionEvalUtils {
+
+  public static VariantVal parseJson(UTF8String input) {
+try {
+  Variant v = VariantBuilder.parseJson(input.toString());
+  return new VariantVal(v.getValue(), v.getMetadata());
+} catch (VariantSizeLimitException e) {
+  throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, 
"parse_json");
+} catch (Throwable throwable) {
+  if (NonFatal.apply(throwable)) {
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+input.toString(),
+new BadRecordException(() -> input, () -> new InternalRow[0], 
throwable));
+  }
+  throw new Error(throwable);

Review Comment:
   Okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

2024-03-27 Thread via GitHub


attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542308491


##
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", 
"remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-  Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = 
this.synchronized {
+if (conf != null && 
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  if (fallbackStorage.isDefined) {
+val fallbackPath = 
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
+if (fallbackPath.equals(fallbackStorage.get.fallbackPath.toString)) {
+  logDebug(s"FallbackStorage defined with path $fallbackPath")
+  fallbackStorage
+} else {
+  // for unit test.
+  Some(new FallbackStorage(conf))
+}
+  } else {
+fallbackStorage = Some(new FallbackStorage(conf))
+logInfo(s"Created FallbackStorage $fallbackStorage")
+fallbackStorage
+  }
 } else {
   None
 }
   }
 
+  def getNumReadThreads(conf: SparkConf): Int = {
+val numShuffleThreads =
+  if (conf == null) None else 
conf.get(STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ)

Review Comment:
   Same here:
   Why the null checks for `conf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

2024-03-27 Thread via GitHub


attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542308319


##
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", 
"remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-  Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = 
this.synchronized {
+if (conf != null && 
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {

Review Comment:
   Why the null checks for `conf`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

2024-03-27 Thread via GitHub


attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
 var pushMergedLocalBlockBytes = 0L
 val prevNumBlocksToFetch = numBlocksToFetch
 
-val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+// Fallback to original implementation, if thread pool is not enabled.
+val localExecIds = if 
(FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @maheshk114 can you please explain why this change is needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

2024-03-27 Thread via GitHub


attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
 var pushMergedLocalBlockBytes = 0L
 val prevNumBlocksToFetch = numBlocksToFetch
 
-val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+// Fallback to original implementation, if thread pool is not enabled.
+val localExecIds = if 
(FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @maheshk114 can you explain why this change is needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]

2024-03-27 Thread via GitHub


HyukjinKwon closed pull request #45742: [SPARK-47619][PYTHON][DOCS] Refine 
docstring of `to_json/from_json`
URL: https://github.com/apache/spark/pull/45742


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on PR #45742:
URL: https://github.com/apache/spark/pull/45742#issuecomment-2024386921

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47620][PYTHON][CONNECT] Add a helper function to sort columns [spark]

2024-03-27 Thread via GitHub


zhengruifeng commented on PR #45743:
URL: https://github.com/apache/spark/pull/45743#issuecomment-2024369477

   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47620][PYTHON][CONNECT] Add a helper function to sort columns [spark]

2024-03-27 Thread via GitHub


zhengruifeng closed pull request #45743: [SPARK-47620][PYTHON][CONNECT] Add a 
helper function to sort columns
URL: https://github.com/apache/spark/pull/45743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47614][CORE] Rename `JavaModuleOptions` to `JVMRuntimeOptions` [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45735:
URL: https://github.com/apache/spark/pull/45735#discussion_r1542278308


##
launcher/src/main/java/org/apache/spark/launcher/JVMRuntimeOptions.java:
##
@@ -18,14 +18,14 @@
 package org.apache.spark.launcher;
 
 /**
- * This helper class is used to place the all `--add-opens` options
- * required by Spark when using Java 17. `DEFAULT_MODULE_OPTIONS` has added
+ * This helper class is used to place some JVM runtime options(eg: 
`--add-opens`)
+ * required by Spark when using Java 17. `DEFAULT_OPTIONS` has added
  * `-XX:+IgnoreUnrecognizedVMOptions` to be robust.
  *
  * @since 3.3.0

Review Comment:
   If this is a public API, let's not touch it. We can correct the doc comments 
though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542277307


##
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant;
+
+import scala.util.control.NonFatal;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.BadRecordException;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantUtil;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.apache.spark.types.variant.VariantSizeLimitException;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.types.VariantVal;
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+public class VariantExpressionEvalUtils {
+
+  public static VariantVal parseJson(UTF8String input) {
+try {
+  Variant v = VariantBuilder.parseJson(input.toString());
+  return new VariantVal(v.getValue(), v.getMetadata());
+} catch (VariantSizeLimitException e) {
+  throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, 
"parse_json");
+} catch (Throwable throwable) {
+  if (NonFatal.apply(throwable)) {
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+input.toString(),
+new BadRecordException(() -> input, () -> new InternalRow[0], 
throwable));
+  }
+  throw new Error(throwable);

Review Comment:
   Sorry I missed the `NonFatal` part. Then using Scala is better as it's a bit 
weird to simulate NonFatal in Java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45150:
URL: https://github.com/apache/spark/pull/45150#discussion_r1542275698


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -435,6 +438,16 @@ message ExecutePlanResponse {
 // the execution is complete. If the server sends onComplete without 
sending a ResultComplete,
 // it means that there is more, and the client should use ReattachExecute 
RPC to continue.
   }
+
+  // This message is used to communicate progress about the query progress 
during the execution.
+  message ExecutionProgress {
+int64 num_tasks = 1;
+int64 num_completed_tasks = 2;

Review Comment:
   After a second thought, it's better to hide Spark internals (stages) to end 
users, and eventually we should only have one progress bar for the query. So 
the current PR is a good starting point.
   
   However, this server-client protocol needs to be stable and we don't want to 
change the client frequently to improve the progress reporting. Can we define a 
minimum set of information we need to send to the client side to display the 
progress bar? I feel it's better to calculate the percentage at the server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542275472


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.util.BadRecordException
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.types.variant.{VariantBuilder, 
VariantSizeLimitException, VariantUtil}
+import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+object VariantExpressionEvalUtils {
+
+  def parseJson(input: UTF8String): VariantVal = {
+try {
+  val v = VariantBuilder.parseJson(input.toString)
+  new VariantVal(v.getValue, v.getMetadata)
+} catch {
+  case _: VariantSizeLimitException =>
+throw 
QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")
+  case NonFatal(e) =>
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(

Review Comment:
   > can we make these two methods return `SparkRuntimeException`? Then we can 
write the code in Java.
   
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542275365


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.util.BadRecordException
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.types.variant.{VariantBuilder, 
VariantSizeLimitException, VariantUtil}
+import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+object VariantExpressionEvalUtils {
+
+  def parseJson(input: UTF8String): VariantVal = {
+try {
+  val v = VariantBuilder.parseJson(input.toString)
+  new VariantVal(v.getValue, v.getMetadata)
+} catch {
+  case _: VariantSizeLimitException =>
+throw 
QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")
+  case NonFatal(e) =>
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(

Review Comment:
   The scala version of `VariantExpressionEvalUtils` seen in `decompilation`, 
as above.



##
sql/core/src/test/scala/org/apache/spark/sql/VariantFunctionSuite.scala:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{CreateArray, 
CreateNamedStruct, Literal, StructsToJson}
+import org.apache.spark.sql.catalyst.expressions.variant.ParseJson
+import org.apache.spark.sql.execution.WholeStageCodegenExec
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.types.variant.VariantBuilder
+import org.apache.spark.unsafe.types.VariantVal
+
+class VariantFunctionSuite extends QueryTest with SharedSparkSession {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024345469

   Thank you for review, @yaooqinn and @viirya . 
   
   I might overlook the other object storage side-effect cases. I'll convert 
this PR to `Draft` and test with Google Cloud Storage at least.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542274672


##
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant;
+
+import scala.util.control.NonFatal;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.BadRecordException;
+import org.apache.spark.sql.errors.QueryExecutionErrors;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantUtil;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.apache.spark.types.variant.VariantSizeLimitException;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.types.VariantVal;
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+public class VariantExpressionEvalUtils {
+
+  public static VariantVal parseJson(UTF8String input) {
+try {
+  Variant v = VariantBuilder.parseJson(input.toString());
+  return new VariantVal(v.getValue(), v.getMetadata());
+} catch (VariantSizeLimitException e) {
+  throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, 
"parse_json");
+} catch (Throwable throwable) {
+  if (NonFatal.apply(throwable)) {
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
+input.toString(),
+new BadRecordException(() -> input, () -> new InternalRow[0], 
throwable));
+  }
+  throw new Error(throwable);

Review Comment:
   Although there are some `differences` in the logic seen when `decompiling` 
Scala, I think `throwing an Error` is a good compromise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Test scala-maven-plugin 4.9.1 [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on code in PR #45716:
URL: https://github.com/apache/spark/pull/45716#discussion_r1542270189


##
pom.xml:
##
@@ -176,7 +176,7 @@
 2.13
 2.2.0
 
-4.7.1
+4.8.1

Review Comment:
   re-test 4.8.1 first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]

2024-03-27 Thread via GitHub


cloud-fan closed pull request #45703: [SPARK-47546][SQL] Improve validation 
when reading Variant from Parquet
URL: https://github.com/apache/spark/pull/45703


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on PR #45703:
URL: https://github.com/apache/spark/pull/45703#issuecomment-2024338004

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on code in PR #45740:
URL: https://github.com/apache/spark/pull/45740#discussion_r1542269102


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -420,6 +420,10 @@ class SparkContext(config: SparkConf) extends Logging {
 // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s
 // We can remove this after Apache Hadoop 3.4.1 releases
 conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", 
"30s")
+// Enable Magic Committer by default for all S3 buckets if hadoop-cloud 
module exists
+if 
(Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"))
 {
+  
conf.setIfMissing("spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled", 
"true")
+}

Review Comment:
   Yes, at the first commit test on this PR, CI shows the following.
   - 
https://github.com/dongjoon-hyun/spark/actions/runs/8458713094/job/23173570901
   ```
   [info] - SPARK-23731 plans should be canonicalizable after being 
(de)serialized *** FAILED *** (53 milliseconds)
   [info]   java.lang.ClassNotFoundException: 
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
   [info]   at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
   [info]   at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
   [info]   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
   [info]   at java.base/java.lang.Class.forName0(Native Method)
   [info]   at java.base/java.lang.Class.forName(Class.java:467)
   [info]   at 
org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41)
   [info]   at 
org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36)
   [info]   at org.apache.spark.util.Utils$.classForName(Utils.scala:97)
   [info]   at 
org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:213)
   [info]   at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45649:
URL: https://github.com/apache/spark/pull/45649#discussion_r1542268213


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala:
##
@@ -35,12 +35,84 @@ case class With(child: Expression, defs: 
Seq[CommonExpressionDef])
   newChildren: IndexedSeq[Expression]): Expression = {
 copy(child = newChildren.head, defs = 
newChildren.tail.map(_.asInstanceOf[CommonExpressionDef]))
   }
+
+  /**
+   * Builds a map of ids (originally assigned ids -> canonicalized ids) to be 
re-assigned during
+   * canonicalization.
+   */
+  protected lazy val canonicalizationIdMap: Map[Long, Long] = {
+// Start numbering after taking into account all nested With expression id 
maps.
+var currentId = child.map {
+  case w: With => w.canonicalizationIdMap.size
+  case _ => 0L
+}.sum
+defs.map { d =>
+  currentId += 1
+  d.id.id -> currentId
+}.toMap
+  }
+
+  /**
+   * Canonicalize by re-assigning all ids in CommonExpressionRef's and 
CommonExpressionDef's
+   * starting from 0. This uses [[canonicalizationIdMap]], which contains all 
mappings for
+   * CommonExpressionDef's defined in this scope.
+   * Note that this takes into account nested With expressions by sharing a 
numbering scope (see
+   * [[canonicalizationIdMap]].
+   */
+  override lazy val canonicalized: Expression = copy(
+child = child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+  case r: CommonExpressionRef if !r.id.canonicalized =>
+r.copy(id = r.id.canonicalize(canonicalizationIdMap))
+}.canonicalized,
+defs = defs.map {
+  case d: CommonExpressionDef if !d.id.canonicalized =>
+d.copy(id = d.id.canonicalize(canonicalizationIdMap)).canonicalized
+  .asInstanceOf[CommonExpressionDef]
+  case d => d.canonicalized.asInstanceOf[CommonExpressionDef]
+}
+  )
+}
+
+object With {
+  /**
+   * Helper function to create a [[With]] statement with an arbitrary number 
of common expressions.
+   * Note that the number of arguments in `commonExprs` should be the same as 
the number of
+   * arguments taken by `replaced`.
+   *
+   * @param commonExprs list of common expressions
+   * @param replacedclosure that defines the common expressions in the 
main expression
+   * @return the expression returned by replaced with its arguments replaced 
by commonExprs in order
+   */
+  def apply(commonExprs: Expression*)(replaced: Seq[Expression] => 
Expression): With = {
+val commonExprDefs = commonExprs.map(CommonExpressionDef(_))
+val commonExprRefs = commonExprDefs.map(new CommonExpressionRef(_))
+With(replaced(commonExprRefs), commonExprDefs)
+  }
+}
+
+case class CommonExpressionId(id: Long = CommonExpressionId.newId, 
canonicalized: Boolean = false) {

Review Comment:
   In `QueryPlan` we have this
   ```
 /**
  * A private mutable variable to indicate whether this plan is the result 
of canonicalization.
  * This is used solely for making sure we wouldn't execute a canonicalized 
plan.
  * See [[canonicalized]] on how this is set.
  */
 @transient private var _isCanonicalizedPlan: Boolean = false
   
 protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan
   ```
   
   Shall we do the same in `With`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45649:
URL: https://github.com/apache/spark/pull/45649#discussion_r1542268213


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala:
##
@@ -35,12 +35,84 @@ case class With(child: Expression, defs: 
Seq[CommonExpressionDef])
   newChildren: IndexedSeq[Expression]): Expression = {
 copy(child = newChildren.head, defs = 
newChildren.tail.map(_.asInstanceOf[CommonExpressionDef]))
   }
+
+  /**
+   * Builds a map of ids (originally assigned ids -> canonicalized ids) to be 
re-assigned during
+   * canonicalization.
+   */
+  protected lazy val canonicalizationIdMap: Map[Long, Long] = {
+// Start numbering after taking into account all nested With expression id 
maps.
+var currentId = child.map {
+  case w: With => w.canonicalizationIdMap.size
+  case _ => 0L
+}.sum
+defs.map { d =>
+  currentId += 1
+  d.id.id -> currentId
+}.toMap
+  }
+
+  /**
+   * Canonicalize by re-assigning all ids in CommonExpressionRef's and 
CommonExpressionDef's
+   * starting from 0. This uses [[canonicalizationIdMap]], which contains all 
mappings for
+   * CommonExpressionDef's defined in this scope.
+   * Note that this takes into account nested With expressions by sharing a 
numbering scope (see
+   * [[canonicalizationIdMap]].
+   */
+  override lazy val canonicalized: Expression = copy(
+child = child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+  case r: CommonExpressionRef if !r.id.canonicalized =>
+r.copy(id = r.id.canonicalize(canonicalizationIdMap))
+}.canonicalized,
+defs = defs.map {
+  case d: CommonExpressionDef if !d.id.canonicalized =>
+d.copy(id = d.id.canonicalize(canonicalizationIdMap)).canonicalized
+  .asInstanceOf[CommonExpressionDef]
+  case d => d.canonicalized.asInstanceOf[CommonExpressionDef]
+}
+  )
+}
+
+object With {
+  /**
+   * Helper function to create a [[With]] statement with an arbitrary number 
of common expressions.
+   * Note that the number of arguments in `commonExprs` should be the same as 
the number of
+   * arguments taken by `replaced`.
+   *
+   * @param commonExprs list of common expressions
+   * @param replacedclosure that defines the common expressions in the 
main expression
+   * @return the expression returned by replaced with its arguments replaced 
by commonExprs in order
+   */
+  def apply(commonExprs: Expression*)(replaced: Seq[Expression] => 
Expression): With = {
+val commonExprDefs = commonExprs.map(CommonExpressionDef(_))
+val commonExprRefs = commonExprDefs.map(new CommonExpressionRef(_))
+With(replaced(commonExprRefs), commonExprDefs)
+  }
+}
+
+case class CommonExpressionId(id: Long = CommonExpressionId.newId, 
canonicalized: Boolean = false) {

Review Comment:
   In query plan we have this
   ```
 /**
  * A private mutable variable to indicate whether this plan is the result 
of canonicalization.
  * This is used solely for making sure we wouldn't execute a canonicalized 
plan.
  * See [[canonicalized]] on how this is set.
  */
 @transient private var _isCanonicalizedPlan: Boolean = false
   
 protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan
   ```
   
   Shall we do the same in `With`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on PR #45742:
URL: https://github.com/apache/spark/pull/45742#issuecomment-2024333559

   cc @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]

2024-03-27 Thread via GitHub


HyukjinKwon opened a new pull request, #45742:
URL: https://github.com/apache/spark/pull/45742

   ### What changes were proposed in this pull request?
   
   This PR refines docstring of  `to_json/from_json`  with more descriptive 
examples.
   
   ### Why are the changes needed?
   
   For better API reference documentation.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, it fixes user-facing documentation.
   
   ### How was this patch tested?
   
   Manually tested. GitHub Actions should verify them.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45649:
URL: https://github.com/apache/spark/pull/45649#discussion_r1542266859


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala:
##
@@ -35,12 +35,84 @@ case class With(child: Expression, defs: 
Seq[CommonExpressionDef])
   newChildren: IndexedSeq[Expression]): Expression = {
 copy(child = newChildren.head, defs = 
newChildren.tail.map(_.asInstanceOf[CommonExpressionDef]))
   }
+
+  /**
+   * Builds a map of ids (originally assigned ids -> canonicalized ids) to be 
re-assigned during
+   * canonicalization.
+   */
+  protected lazy val canonicalizationIdMap: Map[Long, Long] = {

Review Comment:
   ```suggestion
 private lazy val canonicalizationIdMap: Map[Long, Long] = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


gene-db commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542249535


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *

Review Comment:
   removed.



##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(

Review Comment:
   done.



##
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -6594,6 +6594,24 @@ object functions {
 fnWithOptions("from_json", options, e, schema)
   }
 
+  /**
+   * Parses a JSON string and constructs a Variant value.
+   *
+   * @param json a JSON string.
+   *
+   * @since 4.0.0
+   */
+  def parse_json(json: String): Column = parse_json(lit(json))

Review Comment:
   removed.



##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *
+>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ])
+>>> df.select(to_json(parse_json(df.json)).alias("v")).collect()

Review Comment:
   Removed alias.



##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa

Review Comment:
   removed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47491][CORE] Add `slf4j-api` jar to the class path first before the others of `jars` directory [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on PR #45618:
URL: https://github.com/apache/spark/pull/45618#issuecomment-2024317505

   @dongjoon-hyun 
   
   It's a bit magical, I found that in the latest test, `Run / Build modules 
using Maven: repl,sql#hive-thriftserver` also turned green, but before 
yesterday it was still red. May I ask if this is an additional benefit for this 
PR?
   
   -  14 hours ago: 
https://github.com/apache/spark/actions/runs/8452197772/job/23152049324
   - 2 days ago: 
https://github.com/apache/spark/actions/runs/8436403960/job/23138388265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


viirya commented on code in PR #45740:
URL: https://github.com/apache/spark/pull/45740#discussion_r1542253614


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -420,6 +420,10 @@ class SparkContext(config: SparkConf) extends Logging {
 // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s
 // We can remove this after Apache Hadoop 3.4.1 releases
 conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", 
"30s")
+// Enable Magic Committer by default for all S3 buckets if hadoop-cloud 
module exists
+if 
(Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"))
 {
+  
conf.setIfMissing("spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled", 
"true")
+}

Review Comment:
   If the class is not there, will setting the config cause any problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


zhengruifeng commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542253499


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *
+>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ])
+>>> df.select(to_json(parse_json(df.json)).alias("v")).collect()

Review Comment:
   let's do not use alias to also verify the default column name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47491][CORE] Add `slf4j-api` jar to the class path first before the others of `jars` directory [spark]

2024-03-27 Thread via GitHub


LuciferYang commented on PR #45618:
URL: https://github.com/apache/spark/pull/45618#issuecomment-2024309323

   late LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


yaooqinn commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024306179

   LGTM from my side. Thank you @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46894][PYTHON] Move PySpark error conditions into standalone JSON file [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on PR #44920:
URL: https://github.com/apache/spark/pull/44920#issuecomment-2024304980

   Just to make sure, does it work when you install PySpark as a ZIP file? 
e.g., downloading it from https://spark.apache.org/downloads.html would install 
PySpark as a ZIP file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]

2024-03-27 Thread via GitHub


yaooqinn commented on PR #45620:
URL: https://github.com/apache/spark/pull/45620#issuecomment-2024304655

   FYI, ANSI SQL STANDARD 1999

![image](https://github.com/apache/spark/assets/8326978/772375de-11cd-48d0-85ae-71adbbcb5b78)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on code in PR #45740:
URL: https://github.com/apache/spark/pull/45740#discussion_r1542247573


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -420,6 +420,14 @@ class SparkContext(config: SparkConf) extends Logging {
 // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s
 // We can remove this after Apache Hadoop 3.4.1 releases
 conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", 
"30s")
+try {
+  // Try to enable Magic Committer by default for all buckets
+  
Utils.classForName("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")

Review Comment:
   Thank you, @yaooqinn .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]

2024-03-27 Thread via GitHub


panbingkun commented on code in PR #45714:
URL: https://github.com/apache/spark/pull/45714#discussion_r1542242532


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.variant
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.util.BadRecordException
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.types.variant.{VariantBuilder, 
VariantSizeLimitException, VariantUtil}
+import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
+
+/**
+ * A utility class for constructing variant expressions.
+ */
+object VariantExpressionEvalUtils {
+
+  def parseJson(input: UTF8String): VariantVal = {
+try {
+  val v = VariantBuilder.parseJson(input.toString)
+  new VariantVal(v.getValue, v.getMetadata)
+} catch {
+  case _: VariantSizeLimitException =>
+throw 
QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")
+  case NonFatal(e) =>
+throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(

Review Comment:
   https://github.com/apache/spark/assets/15246973/7eef5c95-e6a1-4f02-90db-05526c68b9fe;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]

2024-03-27 Thread via GitHub


yaooqinn commented on PR #45620:
URL: https://github.com/apache/spark/pull/45620#issuecomment-2024294704

   Interesting, the java platform says it's a whitespace 
https://docs.oracle.com/javase/8/docs/api/java/lang/Character.html#isWhitespace-char-


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


yaooqinn commented on code in PR #45740:
URL: https://github.com/apache/spark/pull/45740#discussion_r1542229026


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -420,6 +420,14 @@ class SparkContext(config: SparkConf) extends Logging {
 // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s
 // We can remove this after Apache Hadoop 3.4.1 releases
 conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", 
"30s")
+try {
+  // Try to enable Magic Committer by default for all buckets
+  
Utils.classForName("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")

Review Comment:
   Use `Utils.classIsLoadable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190834


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(

Review Comment:
   should be listed in `python/docs/source/reference/pyspark.sql/functions.rst`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190297


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *
+>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ])
+>>> df.select(to_json(parse_json(df.json)).alias("v")).collect()

Review Comment:
   Let's probably use `show()`. instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190658


##
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -6594,6 +6594,24 @@ object functions {
 fnWithOptions("from_json", options, e, schema)
   }
 
+  /**
+   * Parses a JSON string and constructs a Variant value.
+   *
+   * @param json a JSON string.
+   *
+   * @since 4.0.0
+   */
+  def parse_json(json: String): Column = parse_json(lit(json))

Review Comment:
   I would remove this String type signature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190297


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *
+>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ])
+>>> df.select(to_json(parse_json(df.json)).alias("v")).collect()

Review Comment:
   Let's probably use `show()`. instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190216


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa
+
+Returns
+---
+:class:`~pyspark.sql.Column`
+a new column of VariantType.
+
+Examples
+
+>>> from pyspark.sql.types import *

Review Comment:
   this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on code in PR #45741:
URL: https://github.com/apache/spark/pull/45741#discussion_r1542190154


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15098,6 +15098,38 @@ def from_json(
 return _invoke_function("from_json", _to_java_column(col), schema, 
_options_to_str(options))
 
 
+@_try_remote_functions
+def parse_json(
+col: "ColumnOrName",
+) -> Column:
+"""
+Parses a column containing a JSON string into a :class:`VariantType`.
+
+.. versionadded:: 4.0.0
+
+Parameters
+--
+col : :class:`~pyspark.sql.Column` or str
+a column or column name JSON formatted strings
+
+.. # noqa

Review Comment:
   I think we don't need this (?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #44942:
URL: https://github.com/apache/spark/pull/44942#discussion_r1542184876


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala:
##
@@ -60,23 +60,15 @@ case class UnaryMinus(
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = 
dataType match {
 case _: DecimalType => defineCodeGen(ctx, ev, c => s"$c.unary_$$minus()")
-case ByteType | ShortType if failOnError =>
+case ByteType | ShortType | IntegerType | LongType if failOnError =>
+  val typeUtils = TypeUtils.getClass.getCanonicalName.stripSuffix("$")
+  val refDataType = ctx.addReferenceObj("refDataType", dataType, 
dataType.getClass.getName)
   nullSafeCodeGen(ctx, ev, eval => {
 val javaBoxedType = CodeGenerator.boxedType(dataType)
-val javaType = CodeGenerator.javaType(dataType)
-val originValue = ctx.freshName("origin")
 s"""
-   |$javaType $originValue = ($javaType)($eval);
-   |if ($originValue == $javaBoxedType.MIN_VALUE) {
-   |  throw 
QueryExecutionErrors.unaryMinusCauseOverflowError($originValue);
-   |}
-   |${ev.value} = ($javaType)(-($originValue));
-   """.stripMargin
-  })
-case IntegerType | LongType if failOnError =>
-  val mathUtils = MathUtils.getClass.getCanonicalName.stripSuffix("$")
-  nullSafeCodeGen(ctx, ev, eval => {
-s"${ev.value} = $mathUtils.negateExact($eval);"

Review Comment:
   The previous code is more efficient as it leverages the static data type 
information, while the new code simply passes the data type as a parameter, 
which means we will do data type case match per row. It will cause significant 
perf regression.
   
   I suggest that we add more overloads of `MathUtils.negateExact` to take byte 
and short, so that the generated code can still call it directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47543][CONNECT][PYTHON] Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation [spark]

2024-03-27 Thread via GitHub


HyukjinKwon closed pull request #45699: [SPARK-47543][CONNECT][PYTHON] 
Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation
URL: https://github.com/apache/spark/pull/45699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47543][CONNECT][PYTHON] Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation [spark]

2024-03-27 Thread via GitHub


HyukjinKwon commented on PR #45699:
URL: https://github.com/apache/spark/pull/45699#issuecomment-2024197689

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47366] Add pyspark and dataframe parse_json aliases [spark]

2024-03-27 Thread via GitHub


gene-db opened a new pull request, #45741:
URL: https://github.com/apache/spark/pull/45741

   ### What changes were proposed in this pull request?
   
   Added the `parse_json` function alias for pyspark and dataframe APIs.
   
   ### Why are the changes needed?
   
   Improves usability of the `parse_json` function.
   
   ### Does this PR introduce _any_ user-facing change?
   Before this change, the following would not be possible:
   ```
   df.select(parse_json(df.json)).collect()
   ```
   
   ### How was this patch tested?
   Added unit tests and manual testing.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024185868

   All tests passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]

2024-03-27 Thread via GitHub


kelvinjian-db commented on PR #45649:
URL: https://github.com/apache/spark/pull/45649#issuecomment-2024173902

   @cloud-fan @bersprockets can you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47570][SS] Integrate range scan encoder changes with timer implementation [spark]

2024-03-27 Thread via GitHub


jingz-db commented on code in PR #45709:
URL: https://github.com/apache/spark/pull/45709#discussion_r1541998642


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -188,9 +187,12 @@ class TimerStateImpl(
 
   /**
* Function to get all the registered timers for all grouping keys
+   * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, 
this function

Review Comment:
   Thanks for pointing this out (I did not realize this before but looks like 
we did the right thing)!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47569][SQL] Disallow comparing variant. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on PR #45726:
URL: https://github.com/apache/spark/pull/45726#issuecomment-2024158327

   @cloud-fan Could you help review this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

2024-03-27 Thread via GitHub


jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1542128879


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
   case _ =>
 }
 
-if (isStreaming) {
-  child.execute().mapPartitionsWithStateStore[InternalRow](
+if (hasInitialState) {
+  val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+  val hadoopConfBroadcast = sparkContext.broadcast(
+new 
SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+  child.execute().stateStoreAwareZipPartitions(
+initialState.execute(),
 getStateInfo,
-schemaForKeyRow,
-schemaForValueRow,
-NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
-session.sqlContext.sessionState,
-Some(session.sqlContext.streams.stateStoreCoordinator),
-useColumnFamilies = true,
-useMultipleValuesPerKey = true
-  ) {
-case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-  processData(store, singleIterator)
+storeNames = Seq(),
+session.sqlContext.streams.stateStoreCoordinator) {
+// The state store aware zip partitions will provide us with two 
iterators,
+// child data iterator and the initial state iterator per partition.
+case (partitionId, childDataIterator, initStateIterator) =>
+  if (isStreaming) {
+val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+  stateInfo.get.operatorId, partitionId)
+val storeProviderId = StateStoreProviderId(stateStoreId, 
stateInfo.get.queryRunId)
+val store = StateStore.get(
+  storeProviderId = storeProviderId,
+  keySchema = schemaForKeyRow,
+  valueSchema = schemaForValueRow,
+  NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+  version = stateInfo.get.storeVersion,
+  useColumnFamilies = true,
+  storeConf = storeConf,
+  hadoopConf = hadoopConfBroadcast.value.value
+)
+
+processDataWithInitialState(store, childDataIterator, 
initStateIterator)
+  } else {
+val providerId = {
+  val tempDirPath = Utils.createTempDir().getAbsolutePath
+  new StateStoreProviderId(
+StateStoreId(tempDirPath, 0, partitionId), 
getStateInfo.queryRunId)
+}
+val sqlConf = new SQLConf()
+sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+  classOf[RocksDBStateStoreProvider].getName)
+
+// Create StateStoreProvider for this partition
+val stateStoreProvider = StateStoreProvider.createAndInit(
+  providerId,
+  schemaForKeyRow,
+  schemaForValueRow,
+  NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+  useColumnFamilies = true,
+  storeConf = new StateStoreConf(sqlConf),
+  hadoopConf = hadoopConfBroadcast.value.value,
+  useMultipleValuesPerKey = true)
+val store = stateStoreProvider.getStore(0)
+
+processDataWithInitialState(store, childDataIterator, 
initStateIterator)

Review Comment:
   Good advice! Refactored duplicated codes into 
`initNewStateStoreAndProcessData()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]

2024-03-27 Thread via GitHub


cashmand commented on PR #45703:
URL: https://github.com/apache/spark/pull/45703#issuecomment-2024035501

   @cloud-fan @dongjoon-hyun 
   
   It looks like the checks I added are only effective for the vectorized 
reader, and I probably need to add similar checks for parquet-mr (and modify my 
unit test to test with and without parquet-mr). I think it's fine to merge this 
PR and make the parquet-mr fixes in a follow-up, but if you prefer, I can work 
on adding the checks for parquet-mr to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

2024-03-27 Thread via GitHub


HeartSaVioR closed pull request #45485: [SPARK-47107][SS][PYTHON] Implement 
partition reader for python streaming data source
URL: https://github.com/apache/spark/pull/45485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]

2024-03-27 Thread via GitHub


HeartSaVioR commented on PR #45485:
URL: https://github.com/apache/spark/pull/45485#issuecomment-2024028916

   In the meanwhile, I'm merging to master as CI has passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024025159

   Thank you, @viirya .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


sahnib commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1542070936


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -90,16 +92,13 @@ case class TransformWithStateExec(
 
   override def keyExpressions: Seq[Attribute] = groupingAttributes
 
-  protected val schemaForKeyRow: StructType = new StructType().add("key", 
BinaryType)
-
-  protected val schemaForValueRow: StructType = new StructType().add("value", 
BinaryType)
-
   override def requiredChildDistribution: Seq[Distribution] = {
 StatefulOperatorPartitioning.getCompatibleDistribution(groupingAttributes,
   getStateInfo, conf) ::
   Nil
   }
 
+

Review Comment:
   Removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


sahnib commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1542070036


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -26,10 +26,10 @@ import 
org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expressi
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.Distribution
 import org.apache.spark.sql.execution._
+import 
org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA,
 VALUE_ROW_SCHEMA}

Review Comment:
   Yeah, makes sense. Renamed this to `TransformWithStateKeyValueRowSchema`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]

2024-03-27 Thread via GitHub


amaliujia commented on code in PR #45729:
URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXECUTOR_ID
+
+abstract class LoggingSuiteBase
+extends AnyFunSuite // scalastyle:ignore funsuite
+with BeforeAndAfterAll
+with BeforeAndAfterEach
+with Logging {
+  protected val outContent = new ByteArrayOutputStream()
+  protected val originalErr = System.err
+
+  override def beforeAll(): Unit = {
+val teeStream = new TeeOutputStream(originalErr, outContent)
+System.setErr(new PrintStream(teeStream))
+  }
+
+  override def afterAll(): Unit = {
+System.setErr(originalErr)
+  }
+
+  override def afterEach(): Unit = {
+outContent.reset()
+  }
+}
+
+class StructuredLoggingSuite extends LoggingSuiteBase {
+  val className = this.getClass.getName.stripSuffix("$")
+  override def beforeAll(): Unit = {
+super.beforeAll()
+Logging.enableStructuredLogging()
+  }
+
+  test("Structured logging") {
+val msg = "This is a log message"
+logError(msg)
+
+val logOutput = 
outContent.toString.split("\n").filter(_.contains(msg)).head
+assert(logOutput.nonEmpty)
+// scalastyle:off line.size.limit
+val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log 
message","logger":"$className"}""".r
+// scalastyle:on
+assert(pattern.matches(logOutput))

Review Comment:
   Regarding to the test case readability, I am wondering if we at last put the 
value of the `logOutput` as a comment here with newlines and whitespaces 
inserted to have better readability, so we can read the pattern and then 
quickly read the value in comment to understand what this test case does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]

2024-03-27 Thread via GitHub


amaliujia commented on code in PR #45729:
URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXECUTOR_ID
+
+abstract class LoggingSuiteBase
+extends AnyFunSuite // scalastyle:ignore funsuite
+with BeforeAndAfterAll
+with BeforeAndAfterEach
+with Logging {
+  protected val outContent = new ByteArrayOutputStream()
+  protected val originalErr = System.err
+
+  override def beforeAll(): Unit = {
+val teeStream = new TeeOutputStream(originalErr, outContent)
+System.setErr(new PrintStream(teeStream))
+  }
+
+  override def afterAll(): Unit = {
+System.setErr(originalErr)
+  }
+
+  override def afterEach(): Unit = {
+outContent.reset()
+  }
+}
+
+class StructuredLoggingSuite extends LoggingSuiteBase {
+  val className = this.getClass.getName.stripSuffix("$")
+  override def beforeAll(): Unit = {
+super.beforeAll()
+Logging.enableStructuredLogging()
+  }
+
+  test("Structured logging") {
+val msg = "This is a log message"
+logError(msg)
+
+val logOutput = 
outContent.toString.split("\n").filter(_.contains(msg)).head
+assert(logOutput.nonEmpty)
+// scalastyle:off line.size.limit
+val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log 
message","logger":"$className"}""".r
+// scalastyle:on
+assert(pattern.matches(logOutput))

Review Comment:
   Regarding to the test case readability, I am wondering if we at last put the 
value of the `logOutput` as a comment here with newlines and whitespaces 
inserted to have better readability, so we can read the pattern and the quickly 
read the value in comment to understand what this test case does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]

2024-03-27 Thread via GitHub


amaliujia commented on code in PR #45729:
URL: https://github.com/apache/spark/pull/45729#discussion_r1542066757


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXECUTOR_ID
+
+abstract class LoggingSuiteBase
+extends AnyFunSuite // scalastyle:ignore funsuite
+with BeforeAndAfterAll
+with BeforeAndAfterEach
+with Logging {
+  protected val outContent = new ByteArrayOutputStream()
+  protected val originalErr = System.err
+
+  override def beforeAll(): Unit = {
+val teeStream = new TeeOutputStream(originalErr, outContent)
+System.setErr(new PrintStream(teeStream))
+  }
+
+  override def afterAll(): Unit = {
+System.setErr(originalErr)
+  }
+
+  override def afterEach(): Unit = {
+outContent.reset()
+  }
+}
+
+class StructuredLoggingSuite extends LoggingSuiteBase {
+  val className = this.getClass.getName.stripSuffix("$")
+  override def beforeAll(): Unit = {
+super.beforeAll()
+Logging.enableStructuredLogging()
+  }
+
+  test("Structured logging") {
+val msg = "This is a log message"
+logError(msg)
+
+val logOutput = 
outContent.toString.split("\n").filter(_.contains(msg)).head
+assert(logOutput.nonEmpty)
+// scalastyle:off line.size.limit
+val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log 
message","logger":"$className"}""".r
+// scalastyle:on
+assert(pattern.matches(logOutput))

Review Comment:
   Please ignore if the value is not stable thus whenever we put it in the 
comment, it becomes stable very soon. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]

2024-03-27 Thread via GitHub


amaliujia commented on code in PR #45729:
URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261


##
common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala:
##
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+
+import org.apache.spark.internal.{Logging, MDC}
+import org.apache.spark.internal.LogKey.EXECUTOR_ID
+
+abstract class LoggingSuiteBase
+extends AnyFunSuite // scalastyle:ignore funsuite
+with BeforeAndAfterAll
+with BeforeAndAfterEach
+with Logging {
+  protected val outContent = new ByteArrayOutputStream()
+  protected val originalErr = System.err
+
+  override def beforeAll(): Unit = {
+val teeStream = new TeeOutputStream(originalErr, outContent)
+System.setErr(new PrintStream(teeStream))
+  }
+
+  override def afterAll(): Unit = {
+System.setErr(originalErr)
+  }
+
+  override def afterEach(): Unit = {
+outContent.reset()
+  }
+}
+
+class StructuredLoggingSuite extends LoggingSuiteBase {
+  val className = this.getClass.getName.stripSuffix("$")
+  override def beforeAll(): Unit = {
+super.beforeAll()
+Logging.enableStructuredLogging()
+  }
+
+  test("Structured logging") {
+val msg = "This is a log message"
+logError(msg)
+
+val logOutput = 
outContent.toString.split("\n").filter(_.contains(msg)).head
+assert(logOutput.nonEmpty)
+// scalastyle:off line.size.limit
+val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log 
message","logger":"$className"}""".r
+// scalastyle:on
+assert(pattern.matches(logOutput))

Review Comment:
   Regarding to the test case readability, I am wondering if we at last put the 
value of the `logOutput` as a comment here so we can read the pattern and the 
quickly read the value in comment to understand what this test case does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024012897

   Also, cc @viirya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


sahnib commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1542064062


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -77,14 +78,20 @@ class StatefulProcessorHandleImpl(
 store: StateStore,
 runId: UUID,
 keyEncoder: ExpressionEncoder[Any],
+ttlMode: TTLMode,
 timeoutMode: TimeoutMode,
-isStreaming: Boolean = true)
+isStreaming: Boolean = true,
+batchTimestampMs: Option[Long] = None,
+eventTimeWatermarkMs: Option[Long] = None)
   extends StatefulProcessorHandle with Logging {
   import StatefulProcessorHandleState._
 
+  private val ttlStates: util.List[TTLState] = new util.ArrayList[TTLState]()
+
   private val BATCH_QUERY_ID = "----"
-  private def buildQueryInfo(): QueryInfo = {
+  logInfo(s"Created StatefulProcessorHandle")

Review Comment:
   Removed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun commented on PR #45740:
URL: https://github.com/apache/spark/pull/45740#issuecomment-2024005090

   Hi, do you have any concern on using S3 Magic Committer by default, 
@steveloughran ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]

2024-03-27 Thread via GitHub


dongjoon-hyun opened a new pull request, #45740:
URL: https://github.com/apache/spark/pull/45740

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]

2024-03-27 Thread via GitHub


sebastianhillig-db commented on code in PR #45635:
URL: https://github.com/apache/spark/pull/45635#discussion_r1542043094


##
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##
@@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory(
   def create(): (PythonWorker, Option[Long]) = {
 if (useDaemon) {
   self.synchronized {
-if (idleWorkers.nonEmpty) {
+// Pull from idle workers until we one that is alive, otherwise create 
a new one.
+while (idleWorkers.nonEmpty) {
   val worker = idleWorkers.dequeue()
-  worker.selectionKey.interestOps(SelectionKey.OP_READ | 
SelectionKey.OP_WRITE)
-  return (worker, daemonWorkers.get(worker))
+  val workerHandle = daemonWorkers(worker)

Review Comment:
   Since the idleWorkers queue keeps the reference alive, there should never be 
a case where the daemonWorker disappears until it is retrieved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]

2024-03-27 Thread via GitHub


sebastianhillig-db commented on code in PR #45635:
URL: https://github.com/apache/spark/pull/45635#discussion_r1542043889


##
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##
@@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory(
   def create(): (PythonWorker, Option[Long]) = {
 if (useDaemon) {
   self.synchronized {
-if (idleWorkers.nonEmpty) {
+// Pull from idle workers until we one that is alive, otherwise create 
a new one.
+while (idleWorkers.nonEmpty) {

Review Comment:
   Promise not to force push again: 
https://github.com/apache/spark/pull/45635/files#diff-1bd846874b06327e6abd0803aa74eed890352dfa974d5c1da1a12dc7477e20d0L411



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]

2024-03-27 Thread via GitHub


sebastianhillig-db commented on code in PR #45635:
URL: https://github.com/apache/spark/pull/45635#discussion_r1542041741


##
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##
@@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory(
   def create(): (PythonWorker, Option[Long]) = {
 if (useDaemon) {
   self.synchronized {
-if (idleWorkers.nonEmpty) {
+// Pull from idle workers until we one that is alive, otherwise create 
a new one.
+while (idleWorkers.nonEmpty) {

Review Comment:
   Ugh, sorry - the force push broke that link. I'm referring to 
"releaseWorker" using the same synchronization, so we should not be adding new 
workers while this code runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47570][SS] Integrate range scan encoder changes with timer implementation [spark]

2024-03-27 Thread via GitHub


jingz-db commented on code in PR #45709:
URL: https://github.com/apache/spark/pull/45709#discussion_r1541998642


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -188,9 +187,12 @@ class TimerStateImpl(
 
   /**
* Function to get all the registered timers for all grouping keys
+   * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, 
this function

Review Comment:
   Thanks for pointing this out (I did not realize this before but looks like 
we did the right thing)! We've also left out those implementation details in 
the user facing function `getExpiredTimers()` in `StatefulProcessorHandleimpl` 
here:
   
https://github.com/jingz-db/spark/blob/36ab93519ae1ab43e20926ae47406cff668a3b10/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala#L166-L174



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]

2024-03-27 Thread via GitHub


ueshin commented on code in PR #45635:
URL: https://github.com/apache/spark/pull/45635#discussion_r1541986448


##
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##
@@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory(
   def create(): (PythonWorker, Option[Long]) = {
 if (useDaemon) {
   self.synchronized {
-if (idleWorkers.nonEmpty) {
+// Pull from idle workers until we one that is alive, otherwise create 
a new one.
+while (idleWorkers.nonEmpty) {
   val worker = idleWorkers.dequeue()
-  worker.selectionKey.interestOps(SelectionKey.OP_READ | 
SelectionKey.OP_WRITE)
-  return (worker, daemonWorkers.get(worker))
+  val workerHandle = daemonWorkers(worker)

Review Comment:
   Do we have a chance that this throws an exception?
   If so, should we use `daemonWorkers.get(worker)` same as before, just in 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541963979


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   In case you are interested, I have a draft for the manual codegn version. I 
think I can add it in a follow-up PR. Personally, I don't feel the code has any 
red flag, and it is much better than the `StaticInvoke` approach.
   
   ```
 protected override def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
   val childCode = child.genCode(ctx)
   val tmp = ctx.freshVariable("tmp", classOf[Object])
   val parsedPathArg = ctx.addReferenceObj("parsedPath", parsedPath)
   val dataTypeArg = ctx.addReferenceObj("dataType", dataType)
   val zoneIdArg = ctx.addReferenceObj("zoneId", timeZoneId)
   val code = code"""
 ${childCode.code}
 boolean ${ev.isNull} = ${childCode.isNull};
 ${CodeGenerator.javaType(dataType)} ${ev.value} = 
${CodeGenerator.defaultValue(dataType)};
 if (!${ev.isNull}) {
   Object $tmp = 
org.apache.spark.sql.catalyst.expressions.variant.VariantGet.variantGet(
 ${childCode.value}, $parsedPathArg, $dataTypeArg, $failOnError, 
$zoneIdArg);
 

Re: [PR] [SPARK-47485][SQL][PYTHON][CONNECT] Create column with collations in dataframe API [spark]

2024-03-27 Thread via GitHub


MaxGekk closed pull request #45569: [SPARK-47485][SQL][PYTHON][CONNECT] Create 
column with collations in dataframe API
URL: https://github.com/apache/spark/pull/45569


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47485][SQL][PYTHON][CONNECT] Create column with collations in dataframe API [spark]

2024-03-27 Thread via GitHub


MaxGekk commented on PR #45569:
URL: https://github.com/apache/spark/pull/45569#issuecomment-2023774837

   +1, LGTM. Merging to master.
   Thank you, @stefankandic and @HyukjinKwon for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541767305


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   I have done some experiments with the `StaticInvoke` approach. Suppose I 
have encapsulated the `VariantGet` implementation into the following function:
   ```
   case object VariantGetCodegen {
 def variantGet(input: VariantVal, parsedPath: Array[PathSegment],
dataType: DataType, failOnError: Boolean, zoneId: 
Option[String]): Any = {...}
   }
   ```
   and make `VariantGet` a `RuntimeReplaceable` expression with a replacement 
of `StaticInvoke` that invokes `VariantGetCodegen.variantGet`. It still won't 
directly work because the codegen logic of `StaticInvoke` assumes the return 
type of the method directly matches the return type, but the return type of 
`VariantGetCodegen.variantGet` is `Any`.
   
   In order to make it work, I have to create a wrapper for each return type, 
like:
   
   ```
   case object VariantGetCodegen {
 def variantGetByte(input: VariantVal, parsedPath: Array[PathSegment],
dataType: DataType, failOnError: Boolean, zoneId: 
Option[String]): Byte =
   

Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541767305


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   I have done some experiments with the `StaticInvoke` approach. Suppose I 
have encapsulated the `VariantGet` implementation into the following function:
   ```
   case object VariantGetCodegen {
 def variantGet(input: VariantVal, parsedPath: Array[PathSegment],
dataType: DataType, failOnError: Boolean, zoneId: 
Option[String]): Any = {...}
   }
   ```
   and make `VariantGet` a `RuntimeReplaceable` expression with a replacement 
of `StaticInvoke` that invokes `VariantGetCodegen.variantGet`. It still won't 
directly work because the codegen logic of `StaticInvoke` assumes the return 
type of the method directly matches the return type, but the return type of 
`VariantGetCodegen.variantGet` is `Any`.
   
   In order to make it work, I have to create a wrapper for each return type, 
like:
   
   ```
   case object VariantGetCodegen {
 def variantGetByte(input: VariantVal, parsedPath: Array[PathSegment],
dataType: DataType, failOnError: Boolean, zoneId: 
Option[String]): Byte =
   

Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]

2024-03-27 Thread via GitHub


srielau commented on PR #45620:
URL: https://github.com/apache/spark/pull/45620#issuecomment-2023459267

   > Is there any reason for choosing the current Unicode Character set? I saw 
that `u000B` is not included.
   
   @yaooqinn VT is not a whitespace. It is a segment-separator.
   Since it is traditionally not included (only TAB is commonly included) I 
don't want to overplay this.
   We can always revisit later.
   https://www.compart.com/en/unicode/bidiclass/S
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]

2024-03-27 Thread via GitHub


aarondav commented on code in PR #45150:
URL: https://github.com/apache/spark/pull/45150#discussion_r1541567581


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -435,6 +438,16 @@ message ExecutePlanResponse {
 // the execution is complete. If the server sends onComplete without 
sending a ResultComplete,
 // it means that there is more, and the client should use ReattachExecute 
RPC to continue.
   }
+
+  // This message is used to communicate progress about the query progress 
during the execution.
+  message ExecutionProgress {
+int64 num_tasks = 1;
+int64 num_completed_tasks = 2;

Review Comment:
   (Just my 2c: I think having any progress bar is much better than none. The 
standard Spark progress bar has some ups and some downs, definitely having new 
progress bars appear isn't the most intuitive either. I think it's probably net 
better than one progress bar that gets longer, but I would much prefer having 
some progress bar now that we can extend later, perhaps as we get a better 
sense of how to incorporate AQE and future stages into the UX.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1541540920


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateVariableWithTTLSupport.scala:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.time.Duration
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore}
+import org.apache.spark.sql.streaming.TTLMode
+import org.apache.spark.sql.types.{BinaryType, DataType, LongType, NullType, 
StructField, StructType}
+
+object StateTTLSchema {
+  val KEY_ROW_SCHEMA: StructType = new StructType()
+.add("expirationMs", LongType)
+.add("groupingKey", BinaryType)
+  val VALUE_ROW_SCHEMA: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+}
+
+/**
+ * Encapsulates the ttl row information stored in [[SingleKeyTTLState]].
+ * @param groupingKey grouping key for which ttl is set
+ * @param expirationMs expiration time for the grouping key
+ */
+case class SingleKeyTTLRow(
+groupingKey: Array[Byte],
+expirationMs: Long)
+
+/**
+ * Represents a State variable which supports TTL.
+ */
+trait StateVariableWithTTLSupport {
+
+  /**
+   * Clears the user state associated with this grouping key
+   * if it has expired. This function is called by Spark to perform
+   * cleanup at the end of transformWithState processing.
+   *
+   * Spark uses a secondary index to determine if the user state for
+   * this grouping key has expired. However, its possible that the user
+   * has updated the TTL and secondary index is out of date. Implementations
+   * must validate that the user State has actually expired before cleanup 
based
+   * on their own State data.
+   *
+   * @param groupingKey grouping key for which cleanup should be performed.
+   */
+  def clearIfExpired(groupingKey: Array[Byte]): Unit
+}
+
+/**
+ * Represents the underlying state for secondary TTL Index for a user defined
+ * state variable.
+ *
+ * This state allows Spark to query ttl values based on expiration time
+ * allowing efficient ttl cleanup.
+ */
+trait TTLState {
+
+  /**
+   * Perform the user state clean yp based on ttl values stored in

Review Comment:
   nit: `Perform the user state clean up based`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1541540421


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateVariableWithTTLSupport.scala:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.time.Duration
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
StateStore}
+import org.apache.spark.sql.streaming.TTLMode
+import org.apache.spark.sql.types.{BinaryType, DataType, LongType, NullType, 
StructField, StructType}
+
+object StateTTLSchema {
+  val KEY_ROW_SCHEMA: StructType = new StructType()
+.add("expirationMs", LongType)
+.add("groupingKey", BinaryType)
+  val VALUE_ROW_SCHEMA: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+}
+
+/**
+ * Encapsulates the ttl row information stored in [[SingleKeyTTLState]].
+ * @param groupingKey grouping key for which ttl is set
+ * @param expirationMs expiration time for the grouping key
+ */
+case class SingleKeyTTLRow(
+groupingKey: Array[Byte],
+expirationMs: Long)
+
+/**
+ * Represents a State variable which supports TTL.
+ */
+trait StateVariableWithTTLSupport {
+
+  /**
+   * Clears the user state associated with this grouping key
+   * if it has expired. This function is called by Spark to perform
+   * cleanup at the end of transformWithState processing.
+   *
+   * Spark uses a secondary index to determine if the user state for
+   * this grouping key has expired. However, its possible that the user
+   * has updated the TTL and secondary index is out of date. Implementations
+   * must validate that the user State has actually expired before cleanup 
based
+   * on their own State data.
+   *
+   * @param groupingKey grouping key for which cleanup should be performed.
+   */
+  def clearIfExpired(groupingKey: Array[Byte]): Unit
+}
+
+/**
+ * Represents the underlying state for secondary TTL Index for a user defined
+ * state variable.
+ *
+ * This state allows Spark to query ttl values based on expiration time
+ * allowing efficient ttl cleanup.
+ */
+trait TTLState {
+
+  /**
+   * Perform the user state clean yp based on ttl values stored in
+   * this state. NOTE that its not safe to call this operation concurrently
+   * when the user can also modify the underlying State. Cleanup should be 
initiated
+   * after arbitrary state operations are completed by the user.
+   */
+  def clearExpiredState(): Unit
+}
+
+/**
+ * Manages the ttl information for user state keyed with a single key 
(grouping key).
+ */
+class SingleKeyTTLState(

Review Comment:
   nit: should we call this `SingleKeyTTLStateImpl` to be consistent ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541513308


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   I didn't mean writing everything by hand. Essentially, we create a method 
that implements `VariantGet`, and the class only needs some boilerplate code to 
call this method (similar to the code in `StaticInvoke` itself).
   
   There is still another reason why I don't like `StaticInvoke`. In the 
future, I will write some optimizer rules on `VariantGet` (e.g., to push it 
down a join). This is why I added a new `TreePattern` `VARIANT_GET` in this PR. 
The optimizer rule will run after `RuntimeReplaceable` expression is replaced, 
so it will become `StaticInvoke` and no longer has this tree pattern, and the 
optimizer rule can no longer prune expressions. Plus, matching against 
`StaticInvoke` is also more complex than matching against `VariantGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure 

Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541513308


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   I didn't mean writing everything by hand. Essentially, we create a method 
that implements `VariantGet`, and the class only needs some boilerplate code to 
call this method (similar to the code in `StaticInvoke` itself).
   
   There is still another reason why I don't like `StaticInvoke`. In the 
future, I will write some optimizer rules on `VariantGet` (e.g., to push it 
down a join). This is why I added a new `TreePattern` ``VARIANT_GET` in this 
PR. The optimizer rule will run after `RuntimeReplaceable` expression is 
replaced, so it will become `StaticInvoke` and no longer has this tree pattern, 
and the optimizer rule can no longer prune expressions. Plus, matching against 
`StaticInvoke` is also more complex than matching against `VariantGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure 

Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1541504006


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -77,14 +78,20 @@ class StatefulProcessorHandleImpl(
 store: StateStore,
 runId: UUID,
 keyEncoder: ExpressionEncoder[Any],
+ttlMode: TTLMode,
 timeoutMode: TimeoutMode,
-isStreaming: Boolean = true)
+isStreaming: Boolean = true,
+batchTimestampMs: Option[Long] = None,
+eventTimeWatermarkMs: Option[Long] = None)
   extends StatefulProcessorHandle with Logging {
   import StatefulProcessorHandleState._
 
+  private val ttlStates: util.List[TTLState] = new util.ArrayList[TTLState]()

Review Comment:
   Could we add a comment for what this list is storing ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1541502777


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##
@@ -62,33 +70,79 @@ class StateTypesEncoder[GK, V](
   private val rowToObjDeserializer = 
valExpressionEnc.resolveAndBind().createDeserializer()
   private val reusedValRow = new UnsafeRow(valEncoder.schema.fields.length)
 
+  private val NO_TTL_ENCODED_VALUE: Long = -1L

Review Comment:
   Could we avoid recording this explicitly ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]

2024-03-27 Thread via GitHub


anishshri-db commented on code in PR #45674:
URL: https://github.com/apache/spark/pull/45674#discussion_r1541502063


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##
@@ -23,11 +23,19 @@ import 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.Serializer
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.state.StateStoreErrors
-import org.apache.spark.sql.types.{BinaryType, StructType}
+import org.apache.spark.sql.types.{BinaryType, LongType, StructType}
 
 object StateKeyValueRowSchema {
   val KEY_ROW_SCHEMA: StructType = new StructType().add("key", BinaryType)
-  val VALUE_ROW_SCHEMA: StructType = new StructType().add("value", BinaryType)
+  val VALUE_ROW_SCHEMA: StructType = new StructType()
+.add("value", BinaryType)
+.add("ttlExpirationMs", LongType)

Review Comment:
   Would we add a long even if ttl is disabled ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45150:
URL: https://github.com/apache/spark/pull/45150#discussion_r1541466356


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -435,6 +438,16 @@ message ExecutePlanResponse {
 // the execution is complete. If the server sends onComplete without 
sending a ResultComplete,
 // it means that there is more, and the client should use ReattachExecute 
RPC to continue.
   }
+
+  // This message is used to communicate progress about the query progress 
during the execution.
+  message ExecutionProgress {
+int64 num_tasks = 1;
+int64 num_completed_tasks = 2;

Review Comment:
   I agree we don't have to be very accurate, but is there a reason that we 
need to have a different progress bar style for Spark Connect? Is it because 
the classic Spark "stage by stage" progress bar is bad, or it's hard to 
implement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


cloud-fan commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541456694


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   You can pass anything to `StaticInvoke`, including arbitrary java object, 
using `Literal` with `ObjectType`. I'm against writing codegen by hand, as it's 
hard to debug, and error-prone (maybe inconsistent with the interpreted 
implementation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541443869


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   There is yet another reason against using `StaticInvoke`. The path parameter 
must be an literal, and I can make use of this requirement to avoid repeated 
path parsing. However, I cannot find how to do a similar caching in 
`StaticInvoke`.
   
   Using `StaticInvoke` won't simplify the current implementation. It can 
indeed simplify the implementation if we want to support native codegen rather 
than depending on `CodegenFallback`. I think that is an optional optimization 
we can do in the future, but I would prefer manually writing the codegen for 
`VariantGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]

2024-03-27 Thread via GitHub


chenhao-db commented on code in PR #45708:
URL: https://github.com/apache/spark/pull/45708#discussion_r1541443869


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##
@@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends 
UnaryExpression
   override protected def withNewChildInternal(newChild: Expression): ParseJson 
=
 copy(child = newChild)
 }
+
+// A path segment in the `VariantGet` expression. It represents either an 
object key access (when
+// `key` is not null) or an array index access (when `key` is null).
+case class PathSegment(key: String, index: Int)
+
+object VariantPathParser extends RegexParsers {
+  private def root: Parser[Char] = '$'
+
+  // Parse index segment like `[123]`.
+  private def index: Parser[PathSegment] =
+for {
+  index <- '[' ~> "\\d+".r <~ ']'
+} yield {
+  PathSegment(null, index.toInt)
+}
+
+  // Parse key segment like `.name`, `['name']`, or `["name"]`.
+  private def key: Parser[PathSegment] =
+for {
+  key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" |
+"[\"" ~> "[^\\\"\\?]+".r <~ "\"]"
+} yield {
+  PathSegment(key, 0)
+}
+
+  private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | 
index))
+
+  def parse(str: String): Option[Array[PathSegment]] = {
+this.parseAll(parser, str) match {
+  case Success(result, _) => Some(result.toArray)
+  case _ => None
+}
+  }
+}
+
+/**
+ * The implementation for `variant_get` and `try_variant_get` expressions. 
Extracts a sub-variant
+ * value according to a path and cast it into a concrete data type.
+ * @param child The source variant value to extract from.
+ * @param path A literal path expression. It has the same format as the JSON 
path.
+ * @param schema The target data type to cast into.
+ * @param failOnError Controls whether the expression should throw an 
exception or return null if
+ *the cast fails.
+ * @param timeZoneId A string identifier of a time zone. It is required by 
timestamp-related casts.
+ */
+case class VariantGet(
+child: Expression,
+path: Expression,
+schema: DataType,
+failOnError: Boolean,
+timeZoneId: Option[String] = None)
+extends BinaryExpression
+with TimeZoneAwareExpression
+with NullIntolerant
+with ExpectsInputTypes
+with CodegenFallback
+with QueryErrorsBase {
+  override def checkInputDataTypes(): TypeCheckResult = {
+val check = super.checkInputDataTypes()
+if (check.isFailure) {
+  check
+} else if (!path.foldable) {
+  DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("path"),
+  "inputType" -> toSQLType(path.dataType),
+  "inputExpr" -> toSQLExpr(path)
+)
+  )
+} else if (!VariantGet.checkDataType(schema)) {
+  DataTypeMismatch(
+errorSubClass = "CAST_WITHOUT_SUGGESTION",
+messageParameters = Map(
+  "srcType" -> toSQLType(VariantType),
+  "targetType" -> toSQLType(schema)
+)
+  )
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override lazy val dataType: DataType = schema.asNullable
+
+  @transient private lazy val parsedPath = {
+val pathValue = path.eval().toString
+VariantPathParser.parse(pathValue).getOrElse {
+  throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName)
+}
+  }
+
+  final override def nodePatternsInternal(): Seq[TreePattern] = 
Seq(VARIANT_GET)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType)
+
+  override def prettyName: String = if (failOnError) "variant_get" else 
"try_variant_get"
+
+  override def nullable: Boolean = true
+
+  protected override def nullSafeEval(input: Any, path: Any): Any = {

Review Comment:
   There is yet another reason against using `StaticInvoke`. The path parameter 
must be an literal, and I can make use of this requirement to avoid repeated 
path parsing. However, I cannot find how to do a similar caching in 
`StaticInvoke`.
   
   Using `StaticInvoke` won't simplify the current implementation. It can 
indeed simplify the implementation if we want to support native codegen rather 
than depending on `CodegenFallback`. I think that is an optional optimization 
we can do in the future, when we can manually write the codegen for 
`VariantGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

Re: [PR] [SPARK-46840][SQL][TESTS] Add `CollationBenchmark` [spark]

2024-03-27 Thread via GitHub


GideonPotok commented on code in PR #45453:
URL: https://github.com/apache/spark/pull/45453#discussion_r1541440974


##
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala:
##
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import scala.concurrent.duration._
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.functions._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Benchmark to measure performance for comparisons between collated strings. 
To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *  bin/spark-submit --class 
+ *--jars , 
+ *   2. build/sbt "sql/Test/runMain 
org.apache.spark.sql.execution.benchmark.CollationBenchmark"
+ *   3. generate result:
+ *  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain "
+ *  Results will be written to "benchmarks/CollationBenchmark-results.txt".
+ * }}}
+ */
+
+object CollationBenchmark extends SqlBasedBenchmark {
+  private val collationTypes = Seq("UTF8_BINARY_LCASE", "UNICODE", 
"UTF8_BINARY", "UNICODE_CI")
+
+  def generateSeqInput(n: Long): Seq[UTF8String] = {
+val input = Seq("ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", 
"def", "def",
+  "GHI", "ghi", "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", 
"VWX", "vwx",
+  "ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", 
"GHI", "ghi",
+  "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", 
"YZ")
+  .map(UTF8String.fromString)
+val inputLong: Seq[UTF8String] = (0L until n).map(i => input(i.toInt % 
input.size))
+inputLong
+  }
+
+  private def getDataFrame(strings: Seq[String]): DataFrame = {
+val asPairs = strings.sliding(2, 1).toSeq.map {
+  case Seq(s1, s2) => (s1, s2)
+}
+val d = spark.createDataFrame(asPairs).toDF("s1", "s2")
+d
+  }
+
+  private def generateDataframeInput(l: Long): DataFrame = {
+getDataFrame(generateSeqInput(l).map(_.toString))
+  }
+
+  def benchmarkUTFStringEquals(collationTypes: Seq[String], utf8Strings: 
Seq[UTF8String]): Unit = {
+val sublistStrings = utf8Strings
+
+val benchmark = new Benchmark(
+  "collation unit benchmarks - equalsFunction",
+  utf8Strings.size * 10,
+  warmupTime = 4.seconds,
+  output = output)
+collationTypes.foreach(collationType => {
+  val collation = CollationFactory.fetchCollation(collationType)
+  benchmark.addCase(s"$collationType") { _ =>
+sublistStrings.foreach(s1 =>
+  utf8Strings.foreach(s =>
+(0 to 10).foreach(_ =>
+  collation.equalsFunction(s, s1).booleanValue())
+  )
+)
+  }
+}
+)
+benchmark.run()
+  }
+  def benchmarkUTFStringCompare(collationTypes: Seq[String], utf8Strings: 
Seq[UTF8String]): Unit = {
+val sublistStrings = utf8Strings
+
+val benchmark = new Benchmark(
+  "collation unit benchmarks - compareFunction",
+  utf8Strings.size * 10,
+  warmupTime = 4.seconds,
+  output = output)
+collationTypes.foreach(collationType => {
+  val collation = CollationFactory.fetchCollation(collationType)
+  benchmark.addCase(s"$collationType") { _ =>
+sublistStrings.foreach(s1 =>
+  utf8Strings.foreach(s =>
+(0 to 10).foreach(_ =>
+  collation.comparator.compare(s, s1)
+)
+  )
+)
+  }
+}
+)
+benchmark.run()
+  }
+
+  def benchmarkUTFStringHashFunction(
+  collationTypes: Seq[String],
+  utf8Strings: Seq[UTF8String]): Unit = {
+val sublistStrings = utf8Strings
+
+val benchmark = new Benchmark(
+  "collation unit benchmarks - hashFunction",
+  utf8Strings.size * 10,
+  warmupTime = 4.seconds,
+  output = output)
+collationTypes.foreach(collationType => {
+  val collation = CollationFactory.fetchCollation(collationType)
+  benchmark.addCase(s"$collationType") { _ =>
+sublistStrings.foreach(_ =>
+  utf8Strings.foreach(s =>

[PR] [SPARK-47617] Add TPC-DS testing infrastructure for collations [spark]

2024-03-27 Thread via GitHub


nikolamand-db opened a new pull request, #45739:
URL: https://github.com/apache/spark/pull/45739

   
   
   ### What changes were proposed in this pull request?
   
   
   **PR branch is currently based off of 
https://github.com/apache/spark/pull/45383 because implicit casting is required 
for the checks to work. Will resolve with master once the dependency PR is 
merged.**
   
   We can utilize TPC-DS testing infrastructure already present in Spark. The 
idea is to vary TPC-DS table string columns by adding multiple collations with 
different ordering rules and case sensitivity, producing new tables. These 
tables should yield the same results against predefined TPC-DS queries for 
certain batches of collations. For example, when comparing query runs on table 
where columns are first collated as `UTF8_BINARY` and then as 
`UTF8_BINARY_LCASE`, we should be getting same results after converting to 
lowercase.
   
   Introduce new query suite which tests the described behavior with available 
collations (utf8_binary and unicode) combined with case conversions (lowercase, 
uppercase, randomized case for fuzzy testing).
   
   ### Why are the changes needed?
   
   
   Improve collations testing coverage.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No.
   
   ### How was this patch tested?
   
   
   Added TPC-DS collations query suite.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >