Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on PR #45744: URL: https://github.com/apache/spark/pull/45744#issuecomment-2024460329 @cloud-fan I'm very sorry, I broke the branch corresponding to the previously reviewed PR. Can I use this new PR to submit this feature? The reviewed PR: https://github.com/apache/spark/pull/45714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Refine docstring of `try_sum`, `try_avg`, `avg`, `sum`, `mean` [spark]
HyukjinKwon opened a new pull request, #45745: URL: https://github.com/apache/spark/pull/45745 ### What changes were proposed in this pull request? This PR refines docstring of `try_sum`, `try_avg`, `avg`, `sum`, `mean` with more descriptive examples. ### Why are the changes needed? For better API reference documentation. ### Does this PR introduce _any_ user-facing change? Yes, it fixes user-facing documentation. ### How was this patch tested? Manually tested. GitHub Actions should verify them. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Parse json code generator new [spark]
panbingkun opened a new pull request, #45744: URL: https://github.com/apache/spark/pull/45744 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]
HeartSaVioR commented on PR #45467: URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453733 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]
HeartSaVioR closed pull request #45467: [SPARK-47363][SS] Initial State without state reader implementation for State API v2. URL: https://github.com/apache/spark/pull/45467 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]
HeartSaVioR commented on PR #45467: URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453641 CI failure isn't related - only pyspark-connect failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Test scala-maven-plugin 4.9.1 [spark]
LuciferYang commented on code in PR #45716: URL: https://github.com/apache/spark/pull/45716#discussion_r1542333424 ## core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala: ## @@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging { } private[serializer] var enableDebugging: Boolean = { -!AccessController.doPrivileged(new sun.security.action.GetBooleanAction( - "sun.io.serialization.extendedDebugInfo")).booleanValue() +!sun.security.action.GetBooleanAction + .privilegedGetProperty("sun.io.serialization.extendedDebugInfo") Review Comment: try fix ``` /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71: object security is not a member of package sun ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Test scala-maven-plugin 4.9.1 [spark]
LuciferYang commented on code in PR #45716: URL: https://github.com/apache/spark/pull/45716#discussion_r1542333284 ## core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala: ## @@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging { } private[serializer] var enableDebugging: Boolean = { -!AccessController.doPrivileged(new sun.security.action.GetBooleanAction( - "sun.io.serialization.extendedDebugInfo")).booleanValue() +!sun.security.action.GetBooleanAction Review Comment: try fix ``` /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71: object security is not a member of package sun ``` ## core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala: ## @@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging { } private[serializer] var enableDebugging: Boolean = { -!AccessController.doPrivileged(new sun.security.action.GetBooleanAction( - "sun.io.serialization.extendedDebugInfo")).booleanValue() +!sun.security.action.GetBooleanAction Review Comment: try fix ``` /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71: object security is not a member of package sun ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Test scala-maven-plugin 4.9.1 [spark]
LuciferYang commented on code in PR #45716: URL: https://github.com/apache/spark/pull/45716#discussion_r1542333424 ## core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala: ## @@ -68,8 +67,8 @@ private[spark] object SerializationDebugger extends Logging { } private[serializer] var enableDebugging: Boolean = { -!AccessController.doPrivileged(new sun.security.action.GetBooleanAction( - "sun.io.serialization.extendedDebugInfo")).booleanValue() +!sun.security.action.GetBooleanAction + .privilegedGetProperty("sun.io.serialization.extendedDebugInfo") Review Comment: try fix ``` /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala:71: object security is not a member of package sun ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542312888 ## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant; + +import scala.util.control.NonFatal; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.BadRecordException; +import org.apache.spark.sql.errors.QueryExecutionErrors; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantUtil; +import org.apache.spark.types.variant.VariantBuilder; +import org.apache.spark.types.variant.VariantSizeLimitException; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; + +/** + * A utility class for constructing variant expressions. + */ +public class VariantExpressionEvalUtils { + + public static VariantVal parseJson(UTF8String input) { +try { + Variant v = VariantBuilder.parseJson(input.toString()); + return new VariantVal(v.getValue(), v.getMetadata()); +} catch (VariantSizeLimitException e) { + throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json"); +} catch (Throwable throwable) { + if (NonFatal.apply(throwable)) { +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( +input.toString(), +new BadRecordException(() -> input, () -> new InternalRow[0], throwable)); + } + throw new Error(throwable); Review Comment: Okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]
attilapiros commented on code in PR #45228: URL: https://github.com/apache/spark/pull/45228#discussion_r1542308491 ## core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala: ## @@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging { /** We use one block manager id as a place holder. */ val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337) - def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = { -if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { - Some(new FallbackStorage(conf)) + // There should be only one fallback storage thread pool per executor. + var fallbackStorage: Option[FallbackStorage] = None + def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized { +if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { + if (fallbackStorage.isDefined) { +val fallbackPath = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get +if (fallbackPath.equals(fallbackStorage.get.fallbackPath.toString)) { + logDebug(s"FallbackStorage defined with path $fallbackPath") + fallbackStorage +} else { + // for unit test. + Some(new FallbackStorage(conf)) +} + } else { +fallbackStorage = Some(new FallbackStorage(conf)) +logInfo(s"Created FallbackStorage $fallbackStorage") +fallbackStorage + } } else { None } } + def getNumReadThreads(conf: SparkConf): Int = { +val numShuffleThreads = + if (conf == null) None else conf.get(STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ) Review Comment: Same here: Why the null checks for `conf`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]
attilapiros commented on code in PR #45228: URL: https://github.com/apache/spark/pull/45228#discussion_r1542308319 ## core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala: ## @@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging { /** We use one block manager id as a place holder. */ val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337) - def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = { -if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { - Some(new FallbackStorage(conf)) + // There should be only one fallback storage thread pool per executor. + var fallbackStorage: Option[FallbackStorage] = None + def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized { +if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { Review Comment: Why the null checks for `conf`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]
attilapiros commented on code in PR #45228: URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator( var pushMergedLocalBlockBytes = 0L val prevNumBlocksToFetch = numBlocksToFetch -val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId -val localExecIds = Set(blockManager.blockManagerId.executorId, fallback) +// Fallback to original implementation, if thread pool is not enabled. +val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) { Review Comment: @maheshk114 can you please explain why this change is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]
attilapiros commented on code in PR #45228: URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911 ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator( var pushMergedLocalBlockBytes = 0L val prevNumBlocksToFetch = numBlocksToFetch -val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId -val localExecIds = Set(blockManager.blockManagerId.executorId, fallback) +// Fallback to original implementation, if thread pool is not enabled. +val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) { Review Comment: @maheshk114 can you explain why this change is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]
HyukjinKwon closed pull request #45742: [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` URL: https://github.com/apache/spark/pull/45742 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]
HyukjinKwon commented on PR #45742: URL: https://github.com/apache/spark/pull/45742#issuecomment-2024386921 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47620][PYTHON][CONNECT] Add a helper function to sort columns [spark]
zhengruifeng commented on PR #45743: URL: https://github.com/apache/spark/pull/45743#issuecomment-2024369477 merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47620][PYTHON][CONNECT] Add a helper function to sort columns [spark]
zhengruifeng closed pull request #45743: [SPARK-47620][PYTHON][CONNECT] Add a helper function to sort columns URL: https://github.com/apache/spark/pull/45743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47614][CORE] Rename `JavaModuleOptions` to `JVMRuntimeOptions` [spark]
cloud-fan commented on code in PR #45735: URL: https://github.com/apache/spark/pull/45735#discussion_r1542278308 ## launcher/src/main/java/org/apache/spark/launcher/JVMRuntimeOptions.java: ## @@ -18,14 +18,14 @@ package org.apache.spark.launcher; /** - * This helper class is used to place the all `--add-opens` options - * required by Spark when using Java 17. `DEFAULT_MODULE_OPTIONS` has added + * This helper class is used to place some JVM runtime options(eg: `--add-opens`) + * required by Spark when using Java 17. `DEFAULT_OPTIONS` has added * `-XX:+IgnoreUnrecognizedVMOptions` to be robust. * * @since 3.3.0 Review Comment: If this is a public API, let's not touch it. We can correct the doc comments though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
cloud-fan commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542277307 ## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant; + +import scala.util.control.NonFatal; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.BadRecordException; +import org.apache.spark.sql.errors.QueryExecutionErrors; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantUtil; +import org.apache.spark.types.variant.VariantBuilder; +import org.apache.spark.types.variant.VariantSizeLimitException; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; + +/** + * A utility class for constructing variant expressions. + */ +public class VariantExpressionEvalUtils { + + public static VariantVal parseJson(UTF8String input) { +try { + Variant v = VariantBuilder.parseJson(input.toString()); + return new VariantVal(v.getValue(), v.getMetadata()); +} catch (VariantSizeLimitException e) { + throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json"); +} catch (Throwable throwable) { + if (NonFatal.apply(throwable)) { +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( +input.toString(), +new BadRecordException(() -> input, () -> new InternalRow[0], throwable)); + } + throw new Error(throwable); Review Comment: Sorry I missed the `NonFatal` part. Then using Scala is better as it's a bit weird to simulate NonFatal in Java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]
cloud-fan commented on code in PR #45150: URL: https://github.com/apache/spark/pull/45150#discussion_r1542275698 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -435,6 +438,16 @@ message ExecutePlanResponse { // the execution is complete. If the server sends onComplete without sending a ResultComplete, // it means that there is more, and the client should use ReattachExecute RPC to continue. } + + // This message is used to communicate progress about the query progress during the execution. + message ExecutionProgress { +int64 num_tasks = 1; +int64 num_completed_tasks = 2; Review Comment: After a second thought, it's better to hide Spark internals (stages) to end users, and eventually we should only have one progress bar for the query. So the current PR is a good starting point. However, this server-client protocol needs to be stable and we don't want to change the client frequently to improve the progress reporting. Can we define a minimum set of information we need to send to the client side to display the progress bar? I feel it's better to calculate the percentage at the server side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542275472 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant + +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.util.BadRecordException +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.types.variant.{VariantBuilder, VariantSizeLimitException, VariantUtil} +import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +/** + * A utility class for constructing variant expressions. + */ +object VariantExpressionEvalUtils { + + def parseJson(input: UTF8String): VariantVal = { +try { + val v = VariantBuilder.parseJson(input.toString) + new VariantVal(v.getValue, v.getMetadata) +} catch { + case _: VariantSizeLimitException => +throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json") + case NonFatal(e) => +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( Review Comment: > can we make these two methods return `SparkRuntimeException`? Then we can write the code in Java. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542275365 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant + +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.util.BadRecordException +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.types.variant.{VariantBuilder, VariantSizeLimitException, VariantUtil} +import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +/** + * A utility class for constructing variant expressions. + */ +object VariantExpressionEvalUtils { + + def parseJson(input: UTF8String): VariantVal = { +try { + val v = VariantBuilder.parseJson(input.toString) + new VariantVal(v.getValue, v.getMetadata) +} catch { + case _: VariantSizeLimitException => +throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json") + case NonFatal(e) => +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( Review Comment: The scala version of `VariantExpressionEvalUtils` seen in `decompilation`, as above. ## sql/core/src/test/scala/org/apache/spark/sql/VariantFunctionSuite.scala: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.{CreateArray, CreateNamedStruct, Literal, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.variant.ParseJson +import org.apache.spark.sql.execution.WholeStageCodegenExec +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.types.variant.VariantBuilder +import org.apache.spark.unsafe.types.VariantVal + +class VariantFunctionSuite extends QueryTest with SharedSparkSession { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024345469 Thank you for review, @yaooqinn and @viirya . I might overlook the other object storage side-effect cases. I'll convert this PR to `Draft` and test with Google Cloud Storage at least. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542274672 ## sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant; + +import scala.util.control.NonFatal; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.BadRecordException; +import org.apache.spark.sql.errors.QueryExecutionErrors; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantUtil; +import org.apache.spark.types.variant.VariantBuilder; +import org.apache.spark.types.variant.VariantSizeLimitException; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.unsafe.types.VariantVal; + +/** + * A utility class for constructing variant expressions. + */ +public class VariantExpressionEvalUtils { + + public static VariantVal parseJson(UTF8String input) { +try { + Variant v = VariantBuilder.parseJson(input.toString()); + return new VariantVal(v.getValue(), v.getMetadata()); +} catch (VariantSizeLimitException e) { + throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json"); +} catch (Throwable throwable) { + if (NonFatal.apply(throwable)) { +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( +input.toString(), +new BadRecordException(() -> input, () -> new InternalRow[0], throwable)); + } + throw new Error(throwable); Review Comment: Although there are some `differences` in the logic seen when `decompiling` Scala, I think `throwing an Error` is a good compromise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Test scala-maven-plugin 4.9.1 [spark]
LuciferYang commented on code in PR #45716: URL: https://github.com/apache/spark/pull/45716#discussion_r1542270189 ## pom.xml: ## @@ -176,7 +176,7 @@ 2.13 2.2.0 -4.7.1 +4.8.1 Review Comment: re-test 4.8.1 first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]
cloud-fan closed pull request #45703: [SPARK-47546][SQL] Improve validation when reading Variant from Parquet URL: https://github.com/apache/spark/pull/45703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]
cloud-fan commented on PR #45703: URL: https://github.com/apache/spark/pull/45703#issuecomment-2024338004 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on code in PR #45740: URL: https://github.com/apache/spark/pull/45740#discussion_r1542269102 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -420,6 +420,10 @@ class SparkContext(config: SparkConf) extends Logging { // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s // We can remove this after Apache Hadoop 3.4.1 releases conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", "30s") +// Enable Magic Committer by default for all S3 buckets if hadoop-cloud module exists +if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) { + conf.setIfMissing("spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled", "true") +} Review Comment: Yes, at the first commit test on this PR, CI shows the following. - https://github.com/dongjoon-hyun/spark/actions/runs/8458713094/job/23173570901 ``` [info] - SPARK-23731 plans should be canonicalizable after being (de)serialized *** FAILED *** (53 milliseconds) [info] java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol [info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) [info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) [info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) [info] at java.base/java.lang.Class.forName0(Native Method) [info] at java.base/java.lang.Class.forName(Class.java:467) [info] at org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:41) [info] at org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:36) [info] at org.apache.spark.util.Utils$.classForName(Utils.scala:97) [info] at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:213) [info] at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]
cloud-fan commented on code in PR #45649: URL: https://github.com/apache/spark/pull/45649#discussion_r1542268213 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala: ## @@ -35,12 +35,84 @@ case class With(child: Expression, defs: Seq[CommonExpressionDef]) newChildren: IndexedSeq[Expression]): Expression = { copy(child = newChildren.head, defs = newChildren.tail.map(_.asInstanceOf[CommonExpressionDef])) } + + /** + * Builds a map of ids (originally assigned ids -> canonicalized ids) to be re-assigned during + * canonicalization. + */ + protected lazy val canonicalizationIdMap: Map[Long, Long] = { +// Start numbering after taking into account all nested With expression id maps. +var currentId = child.map { + case w: With => w.canonicalizationIdMap.size + case _ => 0L +}.sum +defs.map { d => + currentId += 1 + d.id.id -> currentId +}.toMap + } + + /** + * Canonicalize by re-assigning all ids in CommonExpressionRef's and CommonExpressionDef's + * starting from 0. This uses [[canonicalizationIdMap]], which contains all mappings for + * CommonExpressionDef's defined in this scope. + * Note that this takes into account nested With expressions by sharing a numbering scope (see + * [[canonicalizationIdMap]]. + */ + override lazy val canonicalized: Expression = copy( +child = child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) { + case r: CommonExpressionRef if !r.id.canonicalized => +r.copy(id = r.id.canonicalize(canonicalizationIdMap)) +}.canonicalized, +defs = defs.map { + case d: CommonExpressionDef if !d.id.canonicalized => +d.copy(id = d.id.canonicalize(canonicalizationIdMap)).canonicalized + .asInstanceOf[CommonExpressionDef] + case d => d.canonicalized.asInstanceOf[CommonExpressionDef] +} + ) +} + +object With { + /** + * Helper function to create a [[With]] statement with an arbitrary number of common expressions. + * Note that the number of arguments in `commonExprs` should be the same as the number of + * arguments taken by `replaced`. + * + * @param commonExprs list of common expressions + * @param replacedclosure that defines the common expressions in the main expression + * @return the expression returned by replaced with its arguments replaced by commonExprs in order + */ + def apply(commonExprs: Expression*)(replaced: Seq[Expression] => Expression): With = { +val commonExprDefs = commonExprs.map(CommonExpressionDef(_)) +val commonExprRefs = commonExprDefs.map(new CommonExpressionRef(_)) +With(replaced(commonExprRefs), commonExprDefs) + } +} + +case class CommonExpressionId(id: Long = CommonExpressionId.newId, canonicalized: Boolean = false) { Review Comment: In `QueryPlan` we have this ``` /** * A private mutable variable to indicate whether this plan is the result of canonicalization. * This is used solely for making sure we wouldn't execute a canonicalized plan. * See [[canonicalized]] on how this is set. */ @transient private var _isCanonicalizedPlan: Boolean = false protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan ``` Shall we do the same in `With`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]
cloud-fan commented on code in PR #45649: URL: https://github.com/apache/spark/pull/45649#discussion_r1542268213 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala: ## @@ -35,12 +35,84 @@ case class With(child: Expression, defs: Seq[CommonExpressionDef]) newChildren: IndexedSeq[Expression]): Expression = { copy(child = newChildren.head, defs = newChildren.tail.map(_.asInstanceOf[CommonExpressionDef])) } + + /** + * Builds a map of ids (originally assigned ids -> canonicalized ids) to be re-assigned during + * canonicalization. + */ + protected lazy val canonicalizationIdMap: Map[Long, Long] = { +// Start numbering after taking into account all nested With expression id maps. +var currentId = child.map { + case w: With => w.canonicalizationIdMap.size + case _ => 0L +}.sum +defs.map { d => + currentId += 1 + d.id.id -> currentId +}.toMap + } + + /** + * Canonicalize by re-assigning all ids in CommonExpressionRef's and CommonExpressionDef's + * starting from 0. This uses [[canonicalizationIdMap]], which contains all mappings for + * CommonExpressionDef's defined in this scope. + * Note that this takes into account nested With expressions by sharing a numbering scope (see + * [[canonicalizationIdMap]]. + */ + override lazy val canonicalized: Expression = copy( +child = child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) { + case r: CommonExpressionRef if !r.id.canonicalized => +r.copy(id = r.id.canonicalize(canonicalizationIdMap)) +}.canonicalized, +defs = defs.map { + case d: CommonExpressionDef if !d.id.canonicalized => +d.copy(id = d.id.canonicalize(canonicalizationIdMap)).canonicalized + .asInstanceOf[CommonExpressionDef] + case d => d.canonicalized.asInstanceOf[CommonExpressionDef] +} + ) +} + +object With { + /** + * Helper function to create a [[With]] statement with an arbitrary number of common expressions. + * Note that the number of arguments in `commonExprs` should be the same as the number of + * arguments taken by `replaced`. + * + * @param commonExprs list of common expressions + * @param replacedclosure that defines the common expressions in the main expression + * @return the expression returned by replaced with its arguments replaced by commonExprs in order + */ + def apply(commonExprs: Expression*)(replaced: Seq[Expression] => Expression): With = { +val commonExprDefs = commonExprs.map(CommonExpressionDef(_)) +val commonExprRefs = commonExprDefs.map(new CommonExpressionRef(_)) +With(replaced(commonExprRefs), commonExprDefs) + } +} + +case class CommonExpressionId(id: Long = CommonExpressionId.newId, canonicalized: Boolean = false) { Review Comment: In query plan we have this ``` /** * A private mutable variable to indicate whether this plan is the result of canonicalization. * This is used solely for making sure we wouldn't execute a canonicalized plan. * See [[canonicalized]] on how this is set. */ @transient private var _isCanonicalizedPlan: Boolean = false protected def isCanonicalizedPlan: Boolean = _isCanonicalizedPlan ``` Shall we do the same in `With`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]
HyukjinKwon commented on PR #45742: URL: https://github.com/apache/spark/pull/45742#issuecomment-2024333559 cc @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47619][PYTHON][DOCS] Refine docstring of `to_json/from_json` [spark]
HyukjinKwon opened a new pull request, #45742: URL: https://github.com/apache/spark/pull/45742 ### What changes were proposed in this pull request? This PR refines docstring of `to_json/from_json` with more descriptive examples. ### Why are the changes needed? For better API reference documentation. ### Does this PR introduce _any_ user-facing change? Yes, it fixes user-facing documentation. ### How was this patch tested? Manually tested. GitHub Actions should verify them. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]
cloud-fan commented on code in PR #45649: URL: https://github.com/apache/spark/pull/45649#discussion_r1542266859 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/With.scala: ## @@ -35,12 +35,84 @@ case class With(child: Expression, defs: Seq[CommonExpressionDef]) newChildren: IndexedSeq[Expression]): Expression = { copy(child = newChildren.head, defs = newChildren.tail.map(_.asInstanceOf[CommonExpressionDef])) } + + /** + * Builds a map of ids (originally assigned ids -> canonicalized ids) to be re-assigned during + * canonicalization. + */ + protected lazy val canonicalizationIdMap: Map[Long, Long] = { Review Comment: ```suggestion private lazy val canonicalizationIdMap: Map[Long, Long] = { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
gene-db commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542249535 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * Review Comment: removed. ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( Review Comment: done. ## sql/core/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6594,6 +6594,24 @@ object functions { fnWithOptions("from_json", options, e, schema) } + /** + * Parses a JSON string and constructs a Variant value. + * + * @param json a JSON string. + * + * @since 4.0.0 + */ + def parse_json(json: String): Column = parse_json(lit(json)) Review Comment: removed. ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * +>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ]) +>>> df.select(to_json(parse_json(df.json)).alias("v")).collect() Review Comment: Removed alias. ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa Review Comment: removed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47491][CORE] Add `slf4j-api` jar to the class path first before the others of `jars` directory [spark]
LuciferYang commented on PR #45618: URL: https://github.com/apache/spark/pull/45618#issuecomment-2024317505 @dongjoon-hyun It's a bit magical, I found that in the latest test, `Run / Build modules using Maven: repl,sql#hive-thriftserver` also turned green, but before yesterday it was still red. May I ask if this is an additional benefit for this PR? - 14 hours ago: https://github.com/apache/spark/actions/runs/8452197772/job/23152049324 - 2 days ago: https://github.com/apache/spark/actions/runs/8436403960/job/23138388265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
viirya commented on code in PR #45740: URL: https://github.com/apache/spark/pull/45740#discussion_r1542253614 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -420,6 +420,10 @@ class SparkContext(config: SparkConf) extends Logging { // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s // We can remove this after Apache Hadoop 3.4.1 releases conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", "30s") +// Enable Magic Committer by default for all S3 buckets if hadoop-cloud module exists +if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) { + conf.setIfMissing("spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled", "true") +} Review Comment: If the class is not there, will setting the config cause any problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
zhengruifeng commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542253499 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * +>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ]) +>>> df.select(to_json(parse_json(df.json)).alias("v")).collect() Review Comment: let's do not use alias to also verify the default column name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47491][CORE] Add `slf4j-api` jar to the class path first before the others of `jars` directory [spark]
LuciferYang commented on PR #45618: URL: https://github.com/apache/spark/pull/45618#issuecomment-2024309323 late LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
yaooqinn commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024306179 LGTM from my side. Thank you @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46894][PYTHON] Move PySpark error conditions into standalone JSON file [spark]
HyukjinKwon commented on PR #44920: URL: https://github.com/apache/spark/pull/44920#issuecomment-2024304980 Just to make sure, does it work when you install PySpark as a ZIP file? e.g., downloading it from https://spark.apache.org/downloads.html would install PySpark as a ZIP file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]
yaooqinn commented on PR #45620: URL: https://github.com/apache/spark/pull/45620#issuecomment-2024304655 FYI, ANSI SQL STANDARD 1999 ![image](https://github.com/apache/spark/assets/8326978/772375de-11cd-48d0-85ae-71adbbcb5b78) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on code in PR #45740: URL: https://github.com/apache/spark/pull/45740#discussion_r1542247573 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -420,6 +420,14 @@ class SparkContext(config: SparkConf) extends Logging { // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s // We can remove this after Apache Hadoop 3.4.1 releases conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", "30s") +try { + // Try to enable Magic Committer by default for all buckets + Utils.classForName("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol") Review Comment: Thank you, @yaooqinn . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47559][SQL] Codegen Support for variant `parse_json` [spark]
panbingkun commented on code in PR #45714: URL: https://github.com/apache/spark/pull/45714#discussion_r1542242532 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.variant + +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.util.BadRecordException +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.types.variant.{VariantBuilder, VariantSizeLimitException, VariantUtil} +import org.apache.spark.unsafe.types.{UTF8String, VariantVal} + +/** + * A utility class for constructing variant expressions. + */ +object VariantExpressionEvalUtils { + + def parseJson(input: UTF8String): VariantVal = { +try { + val v = VariantBuilder.parseJson(input.toString) + new VariantVal(v.getValue, v.getMetadata) +} catch { + case _: VariantSizeLimitException => +throw QueryExecutionErrors.variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json") + case NonFatal(e) => +throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( Review Comment: https://github.com/apache/spark/assets/15246973/7eef5c95-e6a1-4f02-90db-05526c68b9fe;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]
yaooqinn commented on PR #45620: URL: https://github.com/apache/spark/pull/45620#issuecomment-2024294704 Interesting, the java platform says it's a whitespace https://docs.oracle.com/javase/8/docs/api/java/lang/Character.html#isWhitespace-char- -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
yaooqinn commented on code in PR #45740: URL: https://github.com/apache/spark/pull/45740#discussion_r1542229026 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -420,6 +420,14 @@ class SparkContext(config: SparkConf) extends Logging { // HADOOP-19097 Set fs.s3a.connection.establish.timeout to 30s // We can remove this after Apache Hadoop 3.4.1 releases conf.setIfMissing("spark.hadoop.fs.s3a.connection.establish.timeout", "30s") +try { + // Try to enable Magic Committer by default for all buckets + Utils.classForName("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol") Review Comment: Use `Utils.classIsLoadable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190834 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( Review Comment: should be listed in `python/docs/source/reference/pyspark.sql/functions.rst` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190297 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * +>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ]) +>>> df.select(to_json(parse_json(df.json)).alias("v")).collect() Review Comment: Let's probably use `show()`. instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190658 ## sql/core/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -6594,6 +6594,24 @@ object functions { fnWithOptions("from_json", options, e, schema) } + /** + * Parses a JSON string and constructs a Variant value. + * + * @param json a JSON string. + * + * @since 4.0.0 + */ + def parse_json(json: String): Column = parse_json(lit(json)) Review Comment: I would remove this String type signature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190297 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * +>>> df = spark.createDataFrame([ {'json': '''{ "a" : 1 }'''} ]) +>>> df.select(to_json(parse_json(df.json)).alias("v")).collect() Review Comment: Let's probably use `show()`. instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190216 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa + +Returns +--- +:class:`~pyspark.sql.Column` +a new column of VariantType. + +Examples + +>>> from pyspark.sql.types import * Review Comment: this too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47366][PYTHON] Add pyspark and dataframe parse_json aliases [spark]
HyukjinKwon commented on code in PR #45741: URL: https://github.com/apache/spark/pull/45741#discussion_r1542190154 ## python/pyspark/sql/functions/builtin.py: ## @@ -15098,6 +15098,38 @@ def from_json( return _invoke_function("from_json", _to_java_column(col), schema, _options_to_str(options)) +@_try_remote_functions +def parse_json( +col: "ColumnOrName", +) -> Column: +""" +Parses a column containing a JSON string into a :class:`VariantType`. + +.. versionadded:: 4.0.0 + +Parameters +-- +col : :class:`~pyspark.sql.Column` or str +a column or column name JSON formatted strings + +.. # noqa Review Comment: I think we don't need this (?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class [spark]
cloud-fan commented on code in PR #44942: URL: https://github.com/apache/spark/pull/44942#discussion_r1542184876 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -60,23 +60,15 @@ case class UnaryMinus( override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match { case _: DecimalType => defineCodeGen(ctx, ev, c => s"$c.unary_$$minus()") -case ByteType | ShortType if failOnError => +case ByteType | ShortType | IntegerType | LongType if failOnError => + val typeUtils = TypeUtils.getClass.getCanonicalName.stripSuffix("$") + val refDataType = ctx.addReferenceObj("refDataType", dataType, dataType.getClass.getName) nullSafeCodeGen(ctx, ev, eval => { val javaBoxedType = CodeGenerator.boxedType(dataType) -val javaType = CodeGenerator.javaType(dataType) -val originValue = ctx.freshName("origin") s""" - |$javaType $originValue = ($javaType)($eval); - |if ($originValue == $javaBoxedType.MIN_VALUE) { - | throw QueryExecutionErrors.unaryMinusCauseOverflowError($originValue); - |} - |${ev.value} = ($javaType)(-($originValue)); - """.stripMargin - }) -case IntegerType | LongType if failOnError => - val mathUtils = MathUtils.getClass.getCanonicalName.stripSuffix("$") - nullSafeCodeGen(ctx, ev, eval => { -s"${ev.value} = $mathUtils.negateExact($eval);" Review Comment: The previous code is more efficient as it leverages the static data type information, while the new code simply passes the data type as a parameter, which means we will do data type case match per row. It will cause significant perf regression. I suggest that we add more overloads of `MathUtils.negateExact` to take byte and short, so that the generated code can still call it directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47543][CONNECT][PYTHON] Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation [spark]
HyukjinKwon closed pull request #45699: [SPARK-47543][CONNECT][PYTHON] Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation URL: https://github.com/apache/spark/pull/45699 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47543][CONNECT][PYTHON] Inferring `dict` as `MapType` from Pandas DataFrame to allow DataFrame creation [spark]
HyukjinKwon commented on PR #45699: URL: https://github.com/apache/spark/pull/45699#issuecomment-2024197689 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47366] Add pyspark and dataframe parse_json aliases [spark]
gene-db opened a new pull request, #45741: URL: https://github.com/apache/spark/pull/45741 ### What changes were proposed in this pull request? Added the `parse_json` function alias for pyspark and dataframe APIs. ### Why are the changes needed? Improves usability of the `parse_json` function. ### Does this PR introduce _any_ user-facing change? Before this change, the following would not be possible: ``` df.select(parse_json(df.json)).collect() ``` ### How was this patch tested? Added unit tests and manual testing. ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024185868 All tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47511][SQL] Canonicalize With expressions by re-assigning IDs [spark]
kelvinjian-db commented on PR #45649: URL: https://github.com/apache/spark/pull/45649#issuecomment-2024173902 @cloud-fan @bersprockets can you help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47570][SS] Integrate range scan encoder changes with timer implementation [spark]
jingz-db commented on code in PR #45709: URL: https://github.com/apache/spark/pull/45709#discussion_r1541998642 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -188,9 +187,12 @@ class TimerStateImpl( /** * Function to get all the registered timers for all grouping keys + * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function Review Comment: Thanks for pointing this out (I did not realize this before but looks like we did the right thing)! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47569][SQL] Disallow comparing variant. [spark]
chenhao-db commented on PR #45726: URL: https://github.com/apache/spark/pull/45726#issuecomment-2024158327 @cloud-fan Could you help review this PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]
jingz-db commented on code in PR #45467: URL: https://github.com/apache/spark/pull/45467#discussion_r1542128879 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -271,57 +320,111 @@ case class TransformWithStateExec( case _ => } -if (isStreaming) { - child.execute().mapPartitionsWithStateStore[InternalRow]( +if (hasInitialState) { + val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) + val hadoopConfBroadcast = sparkContext.broadcast( +new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf())) + child.execute().stateStoreAwareZipPartitions( +initialState.execute(), getStateInfo, -schemaForKeyRow, -schemaForValueRow, -NoPrefixKeyStateEncoderSpec(schemaForKeyRow), -session.sqlContext.sessionState, -Some(session.sqlContext.streams.stateStoreCoordinator), -useColumnFamilies = true, -useMultipleValuesPerKey = true - ) { -case (store: StateStore, singleIterator: Iterator[InternalRow]) => - processData(store, singleIterator) +storeNames = Seq(), +session.sqlContext.streams.stateStoreCoordinator) { +// The state store aware zip partitions will provide us with two iterators, +// child data iterator and the initial state iterator per partition. +case (partitionId, childDataIterator, initStateIterator) => + if (isStreaming) { +val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation, + stateInfo.get.operatorId, partitionId) +val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId) +val store = StateStore.get( + storeProviderId = storeProviderId, + keySchema = schemaForKeyRow, + valueSchema = schemaForValueRow, + NoPrefixKeyStateEncoderSpec(schemaForKeyRow), + version = stateInfo.get.storeVersion, + useColumnFamilies = true, + storeConf = storeConf, + hadoopConf = hadoopConfBroadcast.value.value +) + +processDataWithInitialState(store, childDataIterator, initStateIterator) + } else { +val providerId = { + val tempDirPath = Utils.createTempDir().getAbsolutePath + new StateStoreProviderId( +StateStoreId(tempDirPath, 0, partitionId), getStateInfo.queryRunId) +} +val sqlConf = new SQLConf() +sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key, + classOf[RocksDBStateStoreProvider].getName) + +// Create StateStoreProvider for this partition +val stateStoreProvider = StateStoreProvider.createAndInit( + providerId, + schemaForKeyRow, + schemaForValueRow, + NoPrefixKeyStateEncoderSpec(schemaForKeyRow), + useColumnFamilies = true, + storeConf = new StateStoreConf(sqlConf), + hadoopConf = hadoopConfBroadcast.value.value, + useMultipleValuesPerKey = true) +val store = stateStoreProvider.getStore(0) + +processDataWithInitialState(store, childDataIterator, initStateIterator) Review Comment: Good advice! Refactored duplicated codes into `initNewStateStoreAndProcessData()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47546][SQL] Improve validation when reading Variant from Parquet [spark]
cashmand commented on PR #45703: URL: https://github.com/apache/spark/pull/45703#issuecomment-2024035501 @cloud-fan @dongjoon-hyun It looks like the checks I added are only effective for the vectorized reader, and I probably need to add similar checks for parquet-mr (and modify my unit test to test with and without parquet-mr). I think it's fine to merge this PR and make the parquet-mr fixes in a follow-up, but if you prefer, I can work on adding the checks for parquet-mr to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]
HeartSaVioR closed pull request #45485: [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source URL: https://github.com/apache/spark/pull/45485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47107][SS][PYTHON] Implement partition reader for python streaming data source [spark]
HeartSaVioR commented on PR #45485: URL: https://github.com/apache/spark/pull/45485#issuecomment-2024028916 In the meanwhile, I'm merging to master as CI has passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024025159 Thank you, @viirya . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
sahnib commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1542070936 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -90,16 +92,13 @@ case class TransformWithStateExec( override def keyExpressions: Seq[Attribute] = groupingAttributes - protected val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) - - protected val schemaForValueRow: StructType = new StructType().add("value", BinaryType) - override def requiredChildDistribution: Seq[Distribution] = { StatefulOperatorPartitioning.getCompatibleDistribution(groupingAttributes, getStateInfo, conf) :: Nil } + Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
sahnib commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1542070036 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -26,10 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expressi import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} Review Comment: Yeah, makes sense. Renamed this to `TransformWithStateKeyValueRowSchema`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]
amaliujia commented on code in PR #45729: URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.{ByteArrayOutputStream, PrintStream} + +import org.apache.commons.io.output.TeeOutputStream +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +abstract class LoggingSuiteBase +extends AnyFunSuite // scalastyle:ignore funsuite +with BeforeAndAfterAll +with BeforeAndAfterEach +with Logging { + protected val outContent = new ByteArrayOutputStream() + protected val originalErr = System.err + + override def beforeAll(): Unit = { +val teeStream = new TeeOutputStream(originalErr, outContent) +System.setErr(new PrintStream(teeStream)) + } + + override def afterAll(): Unit = { +System.setErr(originalErr) + } + + override def afterEach(): Unit = { +outContent.reset() + } +} + +class StructuredLoggingSuite extends LoggingSuiteBase { + val className = this.getClass.getName.stripSuffix("$") + override def beforeAll(): Unit = { +super.beforeAll() +Logging.enableStructuredLogging() + } + + test("Structured logging") { +val msg = "This is a log message" +logError(msg) + +val logOutput = outContent.toString.split("\n").filter(_.contains(msg)).head +assert(logOutput.nonEmpty) +// scalastyle:off line.size.limit +val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r +// scalastyle:on +assert(pattern.matches(logOutput)) Review Comment: Regarding to the test case readability, I am wondering if we at last put the value of the `logOutput` as a comment here with newlines and whitespaces inserted to have better readability, so we can read the pattern and then quickly read the value in comment to understand what this test case does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]
amaliujia commented on code in PR #45729: URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.{ByteArrayOutputStream, PrintStream} + +import org.apache.commons.io.output.TeeOutputStream +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +abstract class LoggingSuiteBase +extends AnyFunSuite // scalastyle:ignore funsuite +with BeforeAndAfterAll +with BeforeAndAfterEach +with Logging { + protected val outContent = new ByteArrayOutputStream() + protected val originalErr = System.err + + override def beforeAll(): Unit = { +val teeStream = new TeeOutputStream(originalErr, outContent) +System.setErr(new PrintStream(teeStream)) + } + + override def afterAll(): Unit = { +System.setErr(originalErr) + } + + override def afterEach(): Unit = { +outContent.reset() + } +} + +class StructuredLoggingSuite extends LoggingSuiteBase { + val className = this.getClass.getName.stripSuffix("$") + override def beforeAll(): Unit = { +super.beforeAll() +Logging.enableStructuredLogging() + } + + test("Structured logging") { +val msg = "This is a log message" +logError(msg) + +val logOutput = outContent.toString.split("\n").filter(_.contains(msg)).head +assert(logOutput.nonEmpty) +// scalastyle:off line.size.limit +val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r +// scalastyle:on +assert(pattern.matches(logOutput)) Review Comment: Regarding to the test case readability, I am wondering if we at last put the value of the `logOutput` as a comment here with newlines and whitespaces inserted to have better readability, so we can read the pattern and the quickly read the value in comment to understand what this test case does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]
amaliujia commented on code in PR #45729: URL: https://github.com/apache/spark/pull/45729#discussion_r1542066757 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.{ByteArrayOutputStream, PrintStream} + +import org.apache.commons.io.output.TeeOutputStream +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +abstract class LoggingSuiteBase +extends AnyFunSuite // scalastyle:ignore funsuite +with BeforeAndAfterAll +with BeforeAndAfterEach +with Logging { + protected val outContent = new ByteArrayOutputStream() + protected val originalErr = System.err + + override def beforeAll(): Unit = { +val teeStream = new TeeOutputStream(originalErr, outContent) +System.setErr(new PrintStream(teeStream)) + } + + override def afterAll(): Unit = { +System.setErr(originalErr) + } + + override def afterEach(): Unit = { +outContent.reset() + } +} + +class StructuredLoggingSuite extends LoggingSuiteBase { + val className = this.getClass.getName.stripSuffix("$") + override def beforeAll(): Unit = { +super.beforeAll() +Logging.enableStructuredLogging() + } + + test("Structured logging") { +val msg = "This is a log message" +logError(msg) + +val logOutput = outContent.toString.split("\n").filter(_.contains(msg)).head +assert(logOutput.nonEmpty) +// scalastyle:off line.size.limit +val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r +// scalastyle:on +assert(pattern.matches(logOutput)) Review Comment: Please ignore if the value is not stable thus whenever we put it in the comment, it becomes stable very soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47574][INFRA] Introduce Structured Logging Framework [spark]
amaliujia commented on code in PR #45729: URL: https://github.com/apache/spark/pull/45729#discussion_r1542066261 ## common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.{ByteArrayOutputStream, PrintStream} + +import org.apache.commons.io.output.TeeOutputStream +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID + +abstract class LoggingSuiteBase +extends AnyFunSuite // scalastyle:ignore funsuite +with BeforeAndAfterAll +with BeforeAndAfterEach +with Logging { + protected val outContent = new ByteArrayOutputStream() + protected val originalErr = System.err + + override def beforeAll(): Unit = { +val teeStream = new TeeOutputStream(originalErr, outContent) +System.setErr(new PrintStream(teeStream)) + } + + override def afterAll(): Unit = { +System.setErr(originalErr) + } + + override def afterEach(): Unit = { +outContent.reset() + } +} + +class StructuredLoggingSuite extends LoggingSuiteBase { + val className = this.getClass.getName.stripSuffix("$") + override def beforeAll(): Unit = { +super.beforeAll() +Logging.enableStructuredLogging() + } + + test("Structured logging") { +val msg = "This is a log message" +logError(msg) + +val logOutput = outContent.toString.split("\n").filter(_.contains(msg)).head +assert(logOutput.nonEmpty) +// scalastyle:off line.size.limit +val pattern = s"""\\{"ts":"[^"]+","level":"ERROR","msg":"This is a log message","logger":"$className"}""".r +// scalastyle:on +assert(pattern.matches(logOutput)) Review Comment: Regarding to the test case readability, I am wondering if we at last put the value of the `logOutput` as a comment here so we can read the pattern and the quickly read the value in comment to understand what this test case does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024012897 Also, cc @viirya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
sahnib commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1542064062 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -77,14 +78,20 @@ class StatefulProcessorHandleImpl( store: StateStore, runId: UUID, keyEncoder: ExpressionEncoder[Any], +ttlMode: TTLMode, timeoutMode: TimeoutMode, -isStreaming: Boolean = true) +isStreaming: Boolean = true, +batchTimestampMs: Option[Long] = None, +eventTimeWatermarkMs: Option[Long] = None) extends StatefulProcessorHandle with Logging { import StatefulProcessorHandleState._ + private val ttlStates: util.List[TTLState] = new util.ArrayList[TTLState]() + private val BATCH_QUERY_ID = "----" - private def buildQueryInfo(): QueryInfo = { + logInfo(s"Created StatefulProcessorHandle") Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun commented on PR #45740: URL: https://github.com/apache/spark/pull/45740#issuecomment-2024005090 Hi, do you have any concern on using S3 Magic Committer by default, @steveloughran ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47618][CORE] Use `Magic Committer` for all S3 buckets by default [spark]
dongjoon-hyun opened a new pull request, #45740: URL: https://github.com/apache/spark/pull/45740 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]
sebastianhillig-db commented on code in PR #45635: URL: https://github.com/apache/spark/pull/45635#discussion_r1542043094 ## core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala: ## @@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory( def create(): (PythonWorker, Option[Long]) = { if (useDaemon) { self.synchronized { -if (idleWorkers.nonEmpty) { +// Pull from idle workers until we one that is alive, otherwise create a new one. +while (idleWorkers.nonEmpty) { val worker = idleWorkers.dequeue() - worker.selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE) - return (worker, daemonWorkers.get(worker)) + val workerHandle = daemonWorkers(worker) Review Comment: Since the idleWorkers queue keeps the reference alive, there should never be a case where the daemonWorker disappears until it is retrieved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]
sebastianhillig-db commented on code in PR #45635: URL: https://github.com/apache/spark/pull/45635#discussion_r1542043889 ## core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala: ## @@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory( def create(): (PythonWorker, Option[Long]) = { if (useDaemon) { self.synchronized { -if (idleWorkers.nonEmpty) { +// Pull from idle workers until we one that is alive, otherwise create a new one. +while (idleWorkers.nonEmpty) { Review Comment: Promise not to force push again: https://github.com/apache/spark/pull/45635/files#diff-1bd846874b06327e6abd0803aa74eed890352dfa974d5c1da1a12dc7477e20d0L411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]
sebastianhillig-db commented on code in PR #45635: URL: https://github.com/apache/spark/pull/45635#discussion_r1542041741 ## core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala: ## @@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory( def create(): (PythonWorker, Option[Long]) = { if (useDaemon) { self.synchronized { -if (idleWorkers.nonEmpty) { +// Pull from idle workers until we one that is alive, otherwise create a new one. +while (idleWorkers.nonEmpty) { Review Comment: Ugh, sorry - the force push broke that link. I'm referring to "releaseWorker" using the same synchronization, so we should not be adding new workers while this code runs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47570][SS] Integrate range scan encoder changes with timer implementation [spark]
jingz-db commented on code in PR #45709: URL: https://github.com/apache/spark/pull/45709#discussion_r1541998642 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ## @@ -188,9 +187,12 @@ class TimerStateImpl( /** * Function to get all the registered timers for all grouping keys + * @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function Review Comment: Thanks for pointing this out (I did not realize this before but looks like we did the right thing)! We've also left out those implementation details in the user facing function `getExpiredTimers()` in `StatefulProcessorHandleimpl` here: https://github.com/jingz-db/spark/blob/36ab93519ae1ab43e20926ae47406cff668a3b10/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala#L166-L174 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47565][PYTHON] PySpark worker pool crash resilience [spark]
ueshin commented on code in PR #45635: URL: https://github.com/apache/spark/pull/45635#discussion_r1541986448 ## core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala: ## @@ -95,10 +95,20 @@ private[spark] class PythonWorkerFactory( def create(): (PythonWorker, Option[Long]) = { if (useDaemon) { self.synchronized { -if (idleWorkers.nonEmpty) { +// Pull from idle workers until we one that is alive, otherwise create a new one. +while (idleWorkers.nonEmpty) { val worker = idleWorkers.dequeue() - worker.selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE) - return (worker, daemonWorkers.get(worker)) + val workerHandle = daemonWorkers(worker) Review Comment: Do we have a chance that this throws an exception? If so, should we use `daemonWorkers.get(worker)` same as before, just in case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541963979 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: In case you are interested, I have a draft for the manual codegn version. I think I can add it in a follow-up PR. Personally, I don't feel the code has any red flag, and it is much better than the `StaticInvoke` approach. ``` protected override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val childCode = child.genCode(ctx) val tmp = ctx.freshVariable("tmp", classOf[Object]) val parsedPathArg = ctx.addReferenceObj("parsedPath", parsedPath) val dataTypeArg = ctx.addReferenceObj("dataType", dataType) val zoneIdArg = ctx.addReferenceObj("zoneId", timeZoneId) val code = code""" ${childCode.code} boolean ${ev.isNull} = ${childCode.isNull}; ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { Object $tmp = org.apache.spark.sql.catalyst.expressions.variant.VariantGet.variantGet( ${childCode.value}, $parsedPathArg, $dataTypeArg, $failOnError, $zoneIdArg);
Re: [PR] [SPARK-47485][SQL][PYTHON][CONNECT] Create column with collations in dataframe API [spark]
MaxGekk closed pull request #45569: [SPARK-47485][SQL][PYTHON][CONNECT] Create column with collations in dataframe API URL: https://github.com/apache/spark/pull/45569 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47485][SQL][PYTHON][CONNECT] Create column with collations in dataframe API [spark]
MaxGekk commented on PR #45569: URL: https://github.com/apache/spark/pull/45569#issuecomment-2023774837 +1, LGTM. Merging to master. Thank you, @stefankandic and @HyukjinKwon for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541767305 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: I have done some experiments with the `StaticInvoke` approach. Suppose I have encapsulated the `VariantGet` implementation into the following function: ``` case object VariantGetCodegen { def variantGet(input: VariantVal, parsedPath: Array[PathSegment], dataType: DataType, failOnError: Boolean, zoneId: Option[String]): Any = {...} } ``` and make `VariantGet` a `RuntimeReplaceable` expression with a replacement of `StaticInvoke` that invokes `VariantGetCodegen.variantGet`. It still won't directly work because the codegen logic of `StaticInvoke` assumes the return type of the method directly matches the return type, but the return type of `VariantGetCodegen.variantGet` is `Any`. In order to make it work, I have to create a wrapper for each return type, like: ``` case object VariantGetCodegen { def variantGetByte(input: VariantVal, parsedPath: Array[PathSegment], dataType: DataType, failOnError: Boolean, zoneId: Option[String]): Byte =
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541767305 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: I have done some experiments with the `StaticInvoke` approach. Suppose I have encapsulated the `VariantGet` implementation into the following function: ``` case object VariantGetCodegen { def variantGet(input: VariantVal, parsedPath: Array[PathSegment], dataType: DataType, failOnError: Boolean, zoneId: Option[String]): Any = {...} } ``` and make `VariantGet` a `RuntimeReplaceable` expression with a replacement of `StaticInvoke` that invokes `VariantGetCodegen.variantGet`. It still won't directly work because the codegen logic of `StaticInvoke` assumes the return type of the method directly matches the return type, but the return type of `VariantGetCodegen.variantGet` is `Any`. In order to make it work, I have to create a wrapper for each return type, like: ``` case object VariantGetCodegen { def variantGetByte(input: VariantVal, parsedPath: Array[PathSegment], dataType: DataType, failOnError: Boolean, zoneId: Option[String]): Byte =
Re: [PR] [SPARK-47492][SQL] Widen whitespace rules in lexer [spark]
srielau commented on PR #45620: URL: https://github.com/apache/spark/pull/45620#issuecomment-2023459267 > Is there any reason for choosing the current Unicode Character set? I saw that `u000B` is not included. @yaooqinn VT is not a whitespace. It is a segment-separator. Since it is traditionally not included (only TAB is commonly included) I don't want to overplay this. We can always revisit later. https://www.compart.com/en/unicode/bidiclass/S -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]
aarondav commented on code in PR #45150: URL: https://github.com/apache/spark/pull/45150#discussion_r1541567581 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -435,6 +438,16 @@ message ExecutePlanResponse { // the execution is complete. If the server sends onComplete without sending a ResultComplete, // it means that there is more, and the client should use ReattachExecute RPC to continue. } + + // This message is used to communicate progress about the query progress during the execution. + message ExecutionProgress { +int64 num_tasks = 1; +int64 num_completed_tasks = 2; Review Comment: (Just my 2c: I think having any progress bar is much better than none. The standard Spark progress bar has some ups and some downs, definitely having new progress bars appear isn't the most intuitive either. I think it's probably net better than one progress bar that gets longer, but I would much prefer having some progress bar now that we can extend later, perhaps as we get a better sense of how to incorporate AQE and future stages into the UX.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
anishshri-db commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1541540920 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateVariableWithTTLSupport.scala: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.time.Duration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.streaming.TTLMode +import org.apache.spark.sql.types.{BinaryType, DataType, LongType, NullType, StructField, StructType} + +object StateTTLSchema { + val KEY_ROW_SCHEMA: StructType = new StructType() +.add("expirationMs", LongType) +.add("groupingKey", BinaryType) + val VALUE_ROW_SCHEMA: StructType = +StructType(Array(StructField("__dummy__", NullType))) +} + +/** + * Encapsulates the ttl row information stored in [[SingleKeyTTLState]]. + * @param groupingKey grouping key for which ttl is set + * @param expirationMs expiration time for the grouping key + */ +case class SingleKeyTTLRow( +groupingKey: Array[Byte], +expirationMs: Long) + +/** + * Represents a State variable which supports TTL. + */ +trait StateVariableWithTTLSupport { + + /** + * Clears the user state associated with this grouping key + * if it has expired. This function is called by Spark to perform + * cleanup at the end of transformWithState processing. + * + * Spark uses a secondary index to determine if the user state for + * this grouping key has expired. However, its possible that the user + * has updated the TTL and secondary index is out of date. Implementations + * must validate that the user State has actually expired before cleanup based + * on their own State data. + * + * @param groupingKey grouping key for which cleanup should be performed. + */ + def clearIfExpired(groupingKey: Array[Byte]): Unit +} + +/** + * Represents the underlying state for secondary TTL Index for a user defined + * state variable. + * + * This state allows Spark to query ttl values based on expiration time + * allowing efficient ttl cleanup. + */ +trait TTLState { + + /** + * Perform the user state clean yp based on ttl values stored in Review Comment: nit: `Perform the user state clean up based` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
anishshri-db commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1541540421 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateVariableWithTTLSupport.scala: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.time.Duration + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, StateStore} +import org.apache.spark.sql.streaming.TTLMode +import org.apache.spark.sql.types.{BinaryType, DataType, LongType, NullType, StructField, StructType} + +object StateTTLSchema { + val KEY_ROW_SCHEMA: StructType = new StructType() +.add("expirationMs", LongType) +.add("groupingKey", BinaryType) + val VALUE_ROW_SCHEMA: StructType = +StructType(Array(StructField("__dummy__", NullType))) +} + +/** + * Encapsulates the ttl row information stored in [[SingleKeyTTLState]]. + * @param groupingKey grouping key for which ttl is set + * @param expirationMs expiration time for the grouping key + */ +case class SingleKeyTTLRow( +groupingKey: Array[Byte], +expirationMs: Long) + +/** + * Represents a State variable which supports TTL. + */ +trait StateVariableWithTTLSupport { + + /** + * Clears the user state associated with this grouping key + * if it has expired. This function is called by Spark to perform + * cleanup at the end of transformWithState processing. + * + * Spark uses a secondary index to determine if the user state for + * this grouping key has expired. However, its possible that the user + * has updated the TTL and secondary index is out of date. Implementations + * must validate that the user State has actually expired before cleanup based + * on their own State data. + * + * @param groupingKey grouping key for which cleanup should be performed. + */ + def clearIfExpired(groupingKey: Array[Byte]): Unit +} + +/** + * Represents the underlying state for secondary TTL Index for a user defined + * state variable. + * + * This state allows Spark to query ttl values based on expiration time + * allowing efficient ttl cleanup. + */ +trait TTLState { + + /** + * Perform the user state clean yp based on ttl values stored in + * this state. NOTE that its not safe to call this operation concurrently + * when the user can also modify the underlying State. Cleanup should be initiated + * after arbitrary state operations are completed by the user. + */ + def clearExpiredState(): Unit +} + +/** + * Manages the ttl information for user state keyed with a single key (grouping key). + */ +class SingleKeyTTLState( Review Comment: nit: should we call this `SingleKeyTTLStateImpl` to be consistent ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541513308 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: I didn't mean writing everything by hand. Essentially, we create a method that implements `VariantGet`, and the class only needs some boilerplate code to call this method (similar to the code in `StaticInvoke` itself). There is still another reason why I don't like `StaticInvoke`. In the future, I will write some optimizer rules on `VariantGet` (e.g., to push it down a join). This is why I added a new `TreePattern` `VARIANT_GET` in this PR. The optimizer rule will run after `RuntimeReplaceable` expression is replaced, so it will become `StaticInvoke` and no longer has this tree pattern, and the optimizer rule can no longer prune expressions. Plus, matching against `StaticInvoke` is also more complex than matching against `VariantGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541513308 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: I didn't mean writing everything by hand. Essentially, we create a method that implements `VariantGet`, and the class only needs some boilerplate code to call this method (similar to the code in `StaticInvoke` itself). There is still another reason why I don't like `StaticInvoke`. In the future, I will write some optimizer rules on `VariantGet` (e.g., to push it down a join). This is why I added a new `TreePattern` ``VARIANT_GET` in this PR. The optimizer rule will run after `RuntimeReplaceable` expression is replaced, so it will become `StaticInvoke` and no longer has this tree pattern, and the optimizer rule can no longer prune expressions. Plus, matching against `StaticInvoke` is also more complex than matching against `VariantGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
anishshri-db commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1541504006 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -77,14 +78,20 @@ class StatefulProcessorHandleImpl( store: StateStore, runId: UUID, keyEncoder: ExpressionEncoder[Any], +ttlMode: TTLMode, timeoutMode: TimeoutMode, -isStreaming: Boolean = true) +isStreaming: Boolean = true, +batchTimestampMs: Option[Long] = None, +eventTimeWatermarkMs: Option[Long] = None) extends StatefulProcessorHandle with Logging { import StatefulProcessorHandleState._ + private val ttlStates: util.List[TTLState] = new util.ArrayList[TTLState]() Review Comment: Could we add a comment for what this list is storing ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
anishshri-db commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1541502777 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala: ## @@ -62,33 +70,79 @@ class StateTypesEncoder[GK, V]( private val rowToObjDeserializer = valExpressionEnc.resolveAndBind().createDeserializer() private val reusedValRow = new UnsafeRow(valEncoder.schema.fields.length) + private val NO_TTL_ENCODED_VALUE: Long = -1L Review Comment: Could we avoid recording this explicitly ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47558][SS] State TTL support for ValueState [spark]
anishshri-db commented on code in PR #45674: URL: https://github.com/apache/spark/pull/45674#discussion_r1541502063 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala: ## @@ -23,11 +23,19 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.Serializer import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.streaming.state.StateStoreErrors -import org.apache.spark.sql.types.{BinaryType, StructType} +import org.apache.spark.sql.types.{BinaryType, LongType, StructType} object StateKeyValueRowSchema { val KEY_ROW_SCHEMA: StructType = new StructType().add("key", BinaryType) - val VALUE_ROW_SCHEMA: StructType = new StructType().add("value", BinaryType) + val VALUE_ROW_SCHEMA: StructType = new StructType() +.add("value", BinaryType) +.add("ttlExpirationMs", LongType) Review Comment: Would we add a long even if ttl is disabled ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47081][CONNECT] Support Query Execution Progress [spark]
cloud-fan commented on code in PR #45150: URL: https://github.com/apache/spark/pull/45150#discussion_r1541466356 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -435,6 +438,16 @@ message ExecutePlanResponse { // the execution is complete. If the server sends onComplete without sending a ResultComplete, // it means that there is more, and the client should use ReattachExecute RPC to continue. } + + // This message is used to communicate progress about the query progress during the execution. + message ExecutionProgress { +int64 num_tasks = 1; +int64 num_completed_tasks = 2; Review Comment: I agree we don't have to be very accurate, but is there a reason that we need to have a different progress bar style for Spark Connect? Is it because the classic Spark "stage by stage" progress bar is bad, or it's hard to implement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
cloud-fan commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541456694 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: You can pass anything to `StaticInvoke`, including arbitrary java object, using `Literal` with `ObjectType`. I'm against writing codegen by hand, as it's hard to debug, and error-prone (maybe inconsistent with the interpreted implementation). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541443869 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: There is yet another reason against using `StaticInvoke`. The path parameter must be an literal, and I can make use of this requirement to avoid repeated path parsing. However, I cannot find how to do a similar caching in `StaticInvoke`. Using `StaticInvoke` won't simplify the current implementation. It can indeed simplify the implementation if we want to support native codegen rather than depending on `CodegenFallback`. I think that is an optional optimization we can do in the future, but I would prefer manually writing the codegen for `VariantGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
Re: [PR] [SPARK-47551][SQL] Add variant_get expression. [spark]
chenhao-db commented on code in PR #45708: URL: https://github.com/apache/spark/pull/45708#discussion_r1541443869 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala: ## @@ -63,3 +70,300 @@ case class ParseJson(child: Expression) extends UnaryExpression override protected def withNewChildInternal(newChild: Expression): ParseJson = copy(child = newChild) } + +// A path segment in the `VariantGet` expression. It represents either an object key access (when +// `key` is not null) or an array index access (when `key` is null). +case class PathSegment(key: String, index: Int) + +object VariantPathParser extends RegexParsers { + private def root: Parser[Char] = '$' + + // Parse index segment like `[123]`. + private def index: Parser[PathSegment] = +for { + index <- '[' ~> "\\d+".r <~ ']' +} yield { + PathSegment(null, index.toInt) +} + + // Parse key segment like `.name`, `['name']`, or `["name"]`. + private def key: Parser[PathSegment] = +for { + key <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']" | +"[\"" ~> "[^\\\"\\?]+".r <~ "\"]" +} yield { + PathSegment(key, 0) +} + + private val parser: Parser[List[PathSegment]] = phrase(root ~> rep(key | index)) + + def parse(str: String): Option[Array[PathSegment]] = { +this.parseAll(parser, str) match { + case Success(result, _) => Some(result.toArray) + case _ => None +} + } +} + +/** + * The implementation for `variant_get` and `try_variant_get` expressions. Extracts a sub-variant + * value according to a path and cast it into a concrete data type. + * @param child The source variant value to extract from. + * @param path A literal path expression. It has the same format as the JSON path. + * @param schema The target data type to cast into. + * @param failOnError Controls whether the expression should throw an exception or return null if + *the cast fails. + * @param timeZoneId A string identifier of a time zone. It is required by timestamp-related casts. + */ +case class VariantGet( +child: Expression, +path: Expression, +schema: DataType, +failOnError: Boolean, +timeZoneId: Option[String] = None) +extends BinaryExpression +with TimeZoneAwareExpression +with NullIntolerant +with ExpectsInputTypes +with CodegenFallback +with QueryErrorsBase { + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!path.foldable) { + DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("path"), + "inputType" -> toSQLType(path.dataType), + "inputExpr" -> toSQLExpr(path) +) + ) +} else if (!VariantGet.checkDataType(schema)) { + DataTypeMismatch( +errorSubClass = "CAST_WITHOUT_SUGGESTION", +messageParameters = Map( + "srcType" -> toSQLType(VariantType), + "targetType" -> toSQLType(schema) +) + ) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override lazy val dataType: DataType = schema.asNullable + + @transient private lazy val parsedPath = { +val pathValue = path.eval().toString +VariantPathParser.parse(pathValue).getOrElse { + throw QueryExecutionErrors.invalidVariantGetPath(pathValue, prettyName) +} + } + + final override def nodePatternsInternal(): Seq[TreePattern] = Seq(VARIANT_GET) + + override def inputTypes: Seq[AbstractDataType] = Seq(VariantType, StringType) + + override def prettyName: String = if (failOnError) "variant_get" else "try_variant_get" + + override def nullable: Boolean = true + + protected override def nullSafeEval(input: Any, path: Any): Any = { Review Comment: There is yet another reason against using `StaticInvoke`. The path parameter must be an literal, and I can make use of this requirement to avoid repeated path parsing. However, I cannot find how to do a similar caching in `StaticInvoke`. Using `StaticInvoke` won't simplify the current implementation. It can indeed simplify the implementation if we want to support native codegen rather than depending on `CodegenFallback`. I think that is an optional optimization we can do in the future, when we can manually write the codegen for `VariantGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: [PR] [SPARK-46840][SQL][TESTS] Add `CollationBenchmark` [spark]
GideonPotok commented on code in PR #45453: URL: https://github.com/apache/spark/pull/45453#discussion_r1541440974 ## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala: ## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import scala.concurrent.duration._ + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.functions._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Benchmark to measure performance for comparisons between collated strings. To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class + *--jars , + * 2. build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.CollationBenchmark" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain " + * Results will be written to "benchmarks/CollationBenchmark-results.txt". + * }}} + */ + +object CollationBenchmark extends SqlBasedBenchmark { + private val collationTypes = Seq("UTF8_BINARY_LCASE", "UNICODE", "UTF8_BINARY", "UNICODE_CI") + + def generateSeqInput(n: Long): Seq[UTF8String] = { +val input = Seq("ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", + "GHI", "ghi", "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", + "ABC", "ABC", "aBC", "aBC", "abc", "abc", "DEF", "DEF", "def", "def", "GHI", "ghi", + "JKL", "jkl", "MNO", "mno", "PQR", "pqr", "STU", "stu", "VWX", "vwx", "YZ") + .map(UTF8String.fromString) +val inputLong: Seq[UTF8String] = (0L until n).map(i => input(i.toInt % input.size)) +inputLong + } + + private def getDataFrame(strings: Seq[String]): DataFrame = { +val asPairs = strings.sliding(2, 1).toSeq.map { + case Seq(s1, s2) => (s1, s2) +} +val d = spark.createDataFrame(asPairs).toDF("s1", "s2") +d + } + + private def generateDataframeInput(l: Long): DataFrame = { +getDataFrame(generateSeqInput(l).map(_.toString)) + } + + def benchmarkUTFStringEquals(collationTypes: Seq[String], utf8Strings: Seq[UTF8String]): Unit = { +val sublistStrings = utf8Strings + +val benchmark = new Benchmark( + "collation unit benchmarks - equalsFunction", + utf8Strings.size * 10, + warmupTime = 4.seconds, + output = output) +collationTypes.foreach(collationType => { + val collation = CollationFactory.fetchCollation(collationType) + benchmark.addCase(s"$collationType") { _ => +sublistStrings.foreach(s1 => + utf8Strings.foreach(s => +(0 to 10).foreach(_ => + collation.equalsFunction(s, s1).booleanValue()) + ) +) + } +} +) +benchmark.run() + } + def benchmarkUTFStringCompare(collationTypes: Seq[String], utf8Strings: Seq[UTF8String]): Unit = { +val sublistStrings = utf8Strings + +val benchmark = new Benchmark( + "collation unit benchmarks - compareFunction", + utf8Strings.size * 10, + warmupTime = 4.seconds, + output = output) +collationTypes.foreach(collationType => { + val collation = CollationFactory.fetchCollation(collationType) + benchmark.addCase(s"$collationType") { _ => +sublistStrings.foreach(s1 => + utf8Strings.foreach(s => +(0 to 10).foreach(_ => + collation.comparator.compare(s, s1) +) + ) +) + } +} +) +benchmark.run() + } + + def benchmarkUTFStringHashFunction( + collationTypes: Seq[String], + utf8Strings: Seq[UTF8String]): Unit = { +val sublistStrings = utf8Strings + +val benchmark = new Benchmark( + "collation unit benchmarks - hashFunction", + utf8Strings.size * 10, + warmupTime = 4.seconds, + output = output) +collationTypes.foreach(collationType => { + val collation = CollationFactory.fetchCollation(collationType) + benchmark.addCase(s"$collationType") { _ => +sublistStrings.foreach(_ => + utf8Strings.foreach(s =>
[PR] [SPARK-47617] Add TPC-DS testing infrastructure for collations [spark]
nikolamand-db opened a new pull request, #45739: URL: https://github.com/apache/spark/pull/45739 ### What changes were proposed in this pull request? **PR branch is currently based off of https://github.com/apache/spark/pull/45383 because implicit casting is required for the checks to work. Will resolve with master once the dependency PR is merged.** We can utilize TPC-DS testing infrastructure already present in Spark. The idea is to vary TPC-DS table string columns by adding multiple collations with different ordering rules and case sensitivity, producing new tables. These tables should yield the same results against predefined TPC-DS queries for certain batches of collations. For example, when comparing query runs on table where columns are first collated as `UTF8_BINARY` and then as `UTF8_BINARY_LCASE`, we should be getting same results after converting to lowercase. Introduce new query suite which tests the described behavior with available collations (utf8_binary and unicode) combined with case conversions (lowercase, uppercase, randomized case for fuzzy testing). ### Why are the changes needed? Improve collations testing coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added TPC-DS collations query suite. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org