Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +dataOut.flush() + +dataIn = new DataIn
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517356072 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def initialOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" + +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "initialOffset"}, +) + +def latestOffset(self) -> dict: +""" +Returns the most recent offset available. + +Returns +--- +dict +A dict or recursive dict whose key and value are primitive types, which includes +Integer, String and Boolean. + +Examples + +>>> def latestOffset(self): +... return {"parititon-1": {"index": 3, "closed": True}, "partition-2": {"index": 5}} +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "latestOffset"}, +) + +def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]: +""" +Returns a list of InputPartition given the start and end offsets. Each InputPartition +represents a data split that can be processed by one Spark task. + +Parameters +-- +start : dict +The start offset of the microbatch to plan partitioning. +end : dict +The end offset of the microbatch to plan partitioning. + +Returns +--- +Sequence[InputPartition] +A sequence of partitions for this data source. Each partition value +must be an instance of `InputPartition` or a subclass of it. +""" +... +raise PySparkNotImplementedError( +error_class="NOT_IMPLEMENTED", +message_parameters={"feature": "partitions"}, +) + +@abstractmethod +def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: Review Comment: I copy the interface from batch python data source. Why is this Iterator[Union] instead of Union[Iterator]? @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HyukjinKwon commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517351498 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { Review Comment: Could either @rangadi or @WweiL take a look? This is somewhat similar with foreachBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HyukjinKwon commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517348253 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: IDE seems not catching this case. Things are runtime as Jungtaek said so it'd be difficult to statically prevent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Issue to fix foreachbatch persist issue for stateful queries [spark]
anishshri-db opened a new pull request, #45432: URL: https://github.com/apache/spark/pull/45432 ### What changes were proposed in this pull request? Issue to fix foreachbatch persist issue for stateful queries ### Why are the changes needed? This allows us to prevent stateful operators from reloading state on every foreachbatch invocation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests ``` [info] Run completed in 20 seconds, 898 milliseconds. [info] Total number of tests run: 11 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]
HyukjinKwon commented on code in PR #45290: URL: https://github.com/apache/spark/pull/45290#discussion_r1517309692 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase { } } + test("aggregates count respects collation") { +Seq( + ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))), Review Comment: cc @dongjoon-hyun too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` duplicated column name handling consisten with `withColumnRenamed` [spark]
HyukjinKwon commented on PR #45431: URL: https://github.com/apache/spark/pull/45431#issuecomment-1985167614 cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` duplicated column name handling consisten with `withColumnRenamed` [spark]
zhengruifeng opened a new pull request, #45431: URL: https://github.com/apache/spark/pull/45431 ### What changes were proposed in this pull request? Make `withColumnsRenamed` duplicated column name handling consistent with `withColumnRenamed` ### Why are the changes needed? `withColumnsRenamed` checks the column names duplication of output dataframe, this is not consistent with `withColumnRenamed`: 1, `withColumnRenamed` doesn't do this check, and support output a dataframe with duplicated column names; 2, when the input dataframe has duplicated column names, `withColumnsRenamed` always fail, even if the columns with the same name are not touched at all: ``` In [8]: df1 = spark.createDataFrame([(1, "id2"),], ["id", "value"]) ...: df2 = spark.createDataFrame([(1, 'x', 'id1'), ], ["id", 'a', "value"]) ...: join = df2.join(df1, on=['id'], how='left') ...: join Out[8]: DataFrame[id: bigint, a: string, value: string, value: string] In [9]: join.withColumnRenamed('id', 'value') Out[9]: DataFrame[value: bigint, a: string, value: string, value: string] In [10]: join.withColumnsRenamed({'id' : 'value'}) ... AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [11]: join.withColumnRenamed('a', 'b') Out[11]: DataFrame[id: bigint, b: string, value: string, value: string] In [12]: join.withColumnsRenamed({'a' : 'b'}) ... AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [13]: join.withColumnRenamed('x', 'y') Out[13]: DataFrame[id: bigint, a: string, value: string, value: string] In [14]: join.withColumnsRenamed({'x' : 'y'}) AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [15]: join.withColumnRenamed('value', 'new_value') Out[15]: DataFrame[id: bigint, a: string, new_value: string, new_value: string] In [16]: join.withColumnsRenamed({'value' : 'new_value'}) AnalysisException: [COLUMN_ALREADY_EXISTS] The column `new_value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]
HeartSaVioR commented on PR #45428: URL: https://github.com/apache/spark/pull/45428#issuecomment-1985161811 FOLLOWUP tag should be OK. Thanks for handling this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]
HyukjinKwon closed pull request #45429: [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references URL: https://github.com/apache/spark/pull/45429 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]
HyukjinKwon commented on PR #45429: URL: https://github.com/apache/spark/pull/45429#issuecomment-1985159258 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HeartSaVioR commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1985157244 Will review sooner than later. Maybe by today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.TimeoutMode +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Class that adds tests to verify operations based on stateful processor handle + * used primarily in queries based on the `transformWithState` operator. + */ +class StatefulProcessorHandleSuite extends SharedSparkSession + with BeforeAndAfter { + + before { +StateStore.stop() +require(!StateStore.isMaintenanceRunning) + } + + after { +StateStore.stop() +require(!StateStore.isMaintenanceRunning) + } + + import StateStoreTestsHelper._ + + val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) + + val schemaForValueRow: StructType = new StructType().add("value", BinaryType) + + private def keyExprEncoder: ExpressionEncoder[Any] = +Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]] + + private def newStoreProviderWithHandle(useColumnFamilies: Boolean): +RocksDBStateStoreProvider = { +newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0), + numColsPrefixKey = 0, + useColumnFamilies = useColumnFamilies) + } + + private def newStoreProviderWithHandle( +storeId: StateStoreId, +numColsPrefixKey: Int, +sqlConf: Option[SQLConf] = None, +conf: Configuration = new Configuration, +useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = { +val provider = new RocksDBStateStoreProvider() +provider.init( + storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = numColsPrefixKey, + useColumnFamilies, + new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf) +provider + } + + private def tryWithProviderResource[T]( +provider: StateStoreProvider)(f: StateStoreProvider => T): T = { +try { + f(provider) +} finally { + provider.close() +} + } + + private def getTimeoutMode(timeoutMode: String): TimeoutMode = { +timeoutMode match { + case "NoTimeouts" => TimeoutMode.NoTimeouts() + case "ProcessingTime" => TimeoutMode.ProcessingTime() + case "EventTime" => TimeoutMode.EventTime() + case _ => throw new IllegalArgumentException(s"Invalid timeoutMode=$timeoutMode") +} + } + + Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode => +test(s"value state creation with timeoutMode=$timeoutMode should succeed") { + tryWithProviderResource(newStoreProviderWithHandle(true)) { provider => +val store = provider.getStore(0) +val handle = new StatefulProcessorHandleImpl(store, + UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode)) +assert(handle.getHandleState === StatefulProcessorHandleState.CREATED) +handle.getValueState[Long]("testState") + } +} + } + + private def verifyInvalidOperation( + handle: StatefulProcessorHandleImpl, + handleState: StatefulProcessorHandleState.Value, + errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = { +handle.setHandleState(handleState) +assert(handle.getHandleState === handleState) +val ex = intercept[Exception] { + fn(handle) +} +assert(ex.getMessage.contains(errorMsg)) + } + + private def createValueStateInstance(handle: StatefulProcessorHandleImpl): Unit = { +handle.getValueState[Long]("testState") + } + + private def registerTimer(handle: StatefulProcessorHan
Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]
LuciferYang commented on code in PR #45290: URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase { } } + test("aggregates count respects collation") { +Seq( + ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))), Review Comment: This test case failed in the daily test of Maven + Java 21, both on Linux and MacOS. @dbatomic Do you have time to investigate the cause of this failure? - linux: https://github.com/apache/spark/actions/runs/8189363924/job/22416511124 ``` - aggregates count respects collation *** FAILED *** Exception thrown while executing query: == Parsed Logical Plan == CTE [t] : +- 'SubqueryAlias t : +- 'Project ['collate('col1, unicode_CI) AS c#427560] :+- 'UnresolvedInlineTable [col1], [[AAA], [aaa]] +- 'Aggregate ['c], [unresolvedalias('COUNT(1)), 'c] +- 'UnresolvedRelation [t], [], false == Analyzed Logical Plan == count(1): bigint, c: string COLLATE 'UNICODE_CI' WithCTE :- CTERelationDef 106, false : +- SubqueryAlias t : +- Project [collate(col1#427562, unicode_CI) AS c#427560] :+- LocalRelation [col1#427562] +- Aggregate [c#427560], [count(1) AS count(1)#427563L, c#427560] +- SubqueryAlias t +- CTERelationRef 106, true, [c#427560], false == Optimized Logical Plan == Aggregate [c#427560], [count(1) AS count(1)#427563L, c#427560] +- LocalRelation [c#427560] == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Current Plan == SortAggregate(key=[c#427560], functions=[count(1)], output=[count(1)#427563L, c#427560]) +- Sort [c#427560 ASC NULLS FIRST], false, 0 +- ShuffleQueryStage 0 +- Exchange hashpartitioning(c#427560, 5), ENSURE_REQUIREMENTS, [plan_id=435997] +- SortAggregate(key=[c#427560], functions=[partial_count(1)], output=[c#427560, count#427567L]) +- *(1) Sort [c#427560 ASC NULLS FIRST], false, 0 +- *(1) LocalTableScan [c#427560] +- == Initial Plan == SortAggregate(key=[c#427560], functions=[count(1)], output=[count(1)#427563L, c#427560]) +- Sort [c#427560 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(c#427560, 5), ENSURE_REQUIREMENTS, [plan_id=435931] +- SortAggregate(key=[c#427560], functions=[partial_count(1)], output=[c#427560, count#427567L]) +- Sort [c#427560 ASC NULLS FIRST], false, 0 +- LocalTableScan [c#427560] == Exception == org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 393.0 failed 1 times, most recent failure: Lost task 1.0 in stage 393.0 (TID 394) (localhost executor driver): java.lang.StringIndexOutOfBoundsException: Index 3 out of bounds for length 3 at java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:55) at java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:52) at java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:213) at java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:210) at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:98) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302) at java.base/java.lang.String.checkIndex(String.java:4832) at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:46) at java.base/java.lang.String.charAt(String.java:1555) at com.ibm.icu.impl.coll.UTF16CollationIterator.handleNextCE32(UTF16CollationIterator.java:107) at com.ibm.icu.impl.coll.CollationIterator.nextCE(CollationIterator.java:247) at com.ibm.icu.impl.coll.CollationKeys.writeSortKeyUpToQuaternary(CollationKeys.java:374) at com.ibm.icu.text.RuleBasedCollator.writeSortKey(RuleBasedCollator.java:1159) at com.ibm.icu.text.RuleBasedCollator.getRawCollationKey(RuleBasedCollator.java:1146) at com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1071) at com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1064) at org.apache.spark.sql.catalyst.util.CollationFactory$Collation.lambda$new$2(CollationFactory.java:104) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$8(ShuffleExchangeExec.scala:330) at org.apache.spark.sql.exe
Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]
LuciferYang commented on code in PR #45290: URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase { } } + test("aggregates count respects collation") { +Seq( + ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))), Review Comment: This test case failed in the daily test of Maven + Java 21, both on Linux and MacOS. @dbatomic Do you have time to investigate the cause of this failure? - linux: https://github.com/apache/spark/actions/runs/8189363924/job/22416511124 ![image](https://github.com/apache/spark/assets/1475305/2b39b2d2-a4b2-45ec-8bd9-42c36f22a0b9) - macos-14: https://github.com/apache/spark/actions/runs/8194082205/job/22416483724 ``` - aggregates count respects collation *** FAILED *** Exception thrown while executing query: == Parsed Logical Plan == CTE [t] : +- 'SubqueryAlias t : +- 'Project ['collate('col1, unicode_CI) AS c#427467] :+- 'UnresolvedInlineTable [col1], [[aaa], [aaa]] +- 'Aggregate ['c], [unresolvedalias('COUNT(1)), 'c] +- 'UnresolvedRelation [t], [], false == Analyzed Logical Plan == count(1): bigint, c: string COLLATE 'UNICODE_CI' WithCTE :- CTERelationDef 103, false : +- SubqueryAlias t : +- Project [collate(col1#427469, unicode_CI) AS c#427467] :+- LocalRelation [col1#427469] +- Aggregate [c#427467], [count(1) AS count(1)#427470L, c#427467] +- SubqueryAlias t +- CTERelationRef 103, true, [c#427467], false == Optimized Logical Plan == Aggregate [c#427467], [count(1) AS count(1)#427470L, c#427467] +- LocalRelation [c#427467] == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Current Plan == SortAggregate(key=[c#427467], functions=[count(1)], output=[count(1)#427470L, c#427467]) +- Sort [c#427467 ASC NULLS FIRST], false, 0 +- ShuffleQueryStage 0 +- Exchange hashpartitioning(c#427467, 5), ENSURE_REQUIREMENTS, [plan_id=435580] +- SortAggregate(key=[c#427467], functions=[partial_count(1)], output=[c#427467, count#427474L]) +- *(1) Sort [c#427467 ASC NULLS FIRST], false, 0 +- *(1) LocalTableScan [c#427467] +- == Initial Plan == SortAggregate(key=[c#427467], functions=[count(1)], output=[count(1)#427470L, c#427467]) +- Sort [c#427467 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(c#427467, 5), ENSURE_REQUIREMENTS, [plan_id=435514] +- SortAggregate(key=[c#427467], functions=[partial_count(1)], output=[c#427467, count#427474L]) +- Sort [c#427467 ASC NULLS FIRST], false, 0 +- LocalTableScan [c#427467] == Exception == org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 387.0 failed 1 times, most recent failure: Lost task 0.0 in stage 387.0 (TID 387) (localhost executor driver): java.lang.StringIndexOutOfBoundsException: Index 4 out of bounds for length 3 at java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:55) at java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:52) at java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:213) at java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:210) at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:98) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302) at java.base/java.lang.String.checkIndex(String.java:4832) at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:46) at java.base/java.lang.String.charAt(String.java:1555) at com.ibm.icu.impl.coll.UTF16CollationIterator.handleNextCE32(UTF16CollationIterator.java:107) at com.ibm.icu.impl.coll.CollationIterator.nextCE(CollationIterator.java:247) at com.ibm.icu.impl.coll.CollationKeys.writeSortKeyUpToQuaternary(CollationKeys.java:374) at com.ibm.icu.text.RuleBasedCollator.writeSortKey(RuleBasedCollator.java:1159) at com.ibm.icu.text.RuleBasedCollator.getRawCollationKey(RuleBasedCollator.java:1146) at com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1071) at com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1064) at org.apache.spark.sql.catalyst.util.CollationFactory$Collation.lambda$new$2(CollationFactory.java:104) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.a
Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]
LuciferYang commented on code in PR #45290: URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase { } } + test("aggregates count respects collation") { +Seq( + ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))), Review Comment: This test case failed in the daily test of Maven + Java 21, both on Linux and MacOS. @dbatomic Do you have time to investigate the cause of this failure? - linux: https://github.com/apache/spark/actions/runs/8189363924/job/22416511124 ![image](https://github.com/apache/spark/assets/1475305/2b39b2d2-a4b2-45ec-8bd9-42c36f22a0b9) - macos-14: https://github.com/apache/spark/actions/runs/8194082205/job/22416483724 ![image](https://github.com/apache/spark/assets/1475305/280c413b-71d6-4715-80a6-1f5ae27a5097) also cc @HyukjinKwon because I noticed that you are paying attention to the tests of maven + Java 21 on macos-14 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]
uros-db commented on code in PR #45422: URL: https://github.com/apache/spark/pull/45422#discussion_r1517236308 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types.{DataType, StringType} + +object CollationUtils { + def checkCollationCompatibility( + superCheck: => TypeCheckResult, Review Comment: I could bail out for each function individually, but I think it's essentially the same thing? only this way, it's less copy-paste across the codebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1517229022 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -410,7 +412,9 @@ public boolean endsWith(final UTF8String suffix, int collationId) { if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) { return this.toLowerCase().endsWith(suffix.toLowerCase()); } -return matchAt(suffix, numBytes - suffix.numBytes, collationId); +if (suffix.numBytes == 0 || this.numBytes == 0) return suffix.numBytes==0; + +return CollationFactory.getStringSearch(this, suffix, collationId).last()==this.numChars()-suffix.numChars(); Review Comment: for clarity, I think we can separate this out into `private boolean collatedEndsWith` ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -410,7 +412,9 @@ public boolean endsWith(final UTF8String suffix, int collationId) { if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) { return this.toLowerCase().endsWith(suffix.toLowerCase()); } -return matchAt(suffix, numBytes - suffix.numBytes, collationId); +if (suffix.numBytes == 0 || this.numBytes == 0) return suffix.numBytes==0; + +return CollationFactory.getStringSearch(this, suffix, collationId).last()==this.numChars()-suffix.numChars(); Review Comment: for clarity, I think we can separate this out into `private boolean collatedEndsWith(...)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]
zhengruifeng opened a new pull request, #45429: URL: https://github.com/apache/spark/pull/45429 ### What changes were proposed in this pull request? Add `VariantType` to API references ### Why are the changes needed? `VariantType` has been added in `__all__` in `types` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1517226969 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -396,7 +396,9 @@ public boolean startsWith(final UTF8String prefix, int collationId) { if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) { return this.toLowerCase().startsWith(prefix.toLowerCase()); } -return matchAt(prefix, 0, collationId); Review Comment: if there's code in `UTF8String` that's no longer used, we should delete it (for example, the overloaded `matchAt`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support complex types of data [spark]
panbingkun commented on PR #44665: URL: https://github.com/apache/spark/pull/44665#issuecomment-1985106638 friendly ping @HyukjinKwon, When you are not busy, can you please continue to help review this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46510][CORE] Spark shell log filter should be applied to all AbstractAppender [spark]
AngersZh closed pull request #44496: [SPARK-46510][CORE] Spark shell log filter should be applied to all AbstractAppender URL: https://github.com/apache/spark/pull/44496 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1517209325 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: python will always fail in runtime, so it's more likely whichever error message is better to understand... But, I don't know, IDE could be the rescue. @HyukjinKwon Will IDE be able to point out if we define this as abstract and implementation class does not implement this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]
LuciferYang commented on PR #45428: URL: https://github.com/apache/spark/pull/45428#issuecomment-1985100729 Thanks @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]
LuciferYang commented on PR #45428: URL: https://github.com/apache/spark/pull/45428#issuecomment-1985099843 also cc @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]
LuciferYang commented on PR #45428: URL: https://github.com/apache/spark/pull/45428#issuecomment-1985094500 This is my first time handling such a situation, is it better to create a new Jira or is it better as a FOLLOWUP of SPARK-47305? cc @HyukjinKwon @HeartSaVioR @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47305][SQL][TESTS] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]
LuciferYang opened a new pull request, #45428: URL: https://github.com/apache/spark/pull/45428 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]
HeartSaVioR commented on code in PR #45051: URL: https://github.com/apache/spark/pull/45051#discussion_r1517119556 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.Serializable + +import org.apache.spark.annotation.{Evolving, Experimental} + +/** + * Class used to provide access to expired timer's expiry time and timeout mode. These values Review Comment: nit: Technically the timeout mode is not visible with trait. If that's intentional, probably remove that part in the interface doc. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode} + +/** + * Class that provides a concrete implementation that can be used to provide access to expired + * timer's expiry time and timeout mode. These values are only relevant if the ExpiredTimerInfo Review Comment: nit: same, timeout mode is not visible to user function AFAIK. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -103,8 +116,12 @@ case class TransformWithStateExec( val keyObj = getKeyObj(keyRow) // convert key to objects ImplicitGroupingKeyTracker.setImplicitKey(keyObj) val valueObjIter = valueRowIter.map(getValueObj.apply) -val mappedIterator = statefulProcessor.handleInputRows(keyObj, valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents)).map { obj => +val mappedIterator = statefulProcessor.handleInputRows( + keyObj, + valueObjIter, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents), + new ExpiredTimerInfoImpl(false) Review Comment: super nit / 2 cents: name parameter for boolean would give much better readability in non-IDE environment. It's really more about general suggestion and preference, so you can ignore. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl( override def getQueryInfo(): QueryInfo = currQueryInfo + private def getTimerState[T](): TimerStateImpl[T] = { +new TimerStateImpl[T](store, timeoutMode, keyEncoder) + } + + private val timerState = getTimerState[Boolean]() + + override def registerTimer(expiryTimestampMs: Long): Unit = { +if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) { + throw StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString) +} + +if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) { + throw StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString) +} + +if (timerState.exists(expiryTimestampMs)) { + logWarning(s"Timer already exists for expiryTimestampMs=$expiryTimestampMs") Review Comment: I'm OK with moving the logging to TimerStateImpl if it helps to solve this. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
yaooqinn commented on PR #45424: URL: https://github.com/apache/spark/pull/45424#issuecomment-1985053630 Thanks, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
yaooqinn closed pull request #45424: [SPARK-47319][SQL] Improve missingInput calculation URL: https://github.com/apache/spark/pull/45424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]
yaooqinn commented on code in PR #45418: URL: https://github.com/apache/spark/pull/45418#discussion_r1517168161 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala: ## @@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with Logging { */ def canHandle(url : String): Boolean + @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0") + def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + /** * Get the custom datatype mapping for the given jdbc meta information. * @param sqlType The sql type (see java.sql.Types) * @param typeName The sql type name (e.g. "BIGINT UNSIGNED") * @param size The size of the type. * @param md Result metadata associated with this type. + * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not. * @return The actual DataType (subclasses of [[org.apache.spark.sql.types.DataType]]) * or null if the default type mapping should be used. */ def getCatalystType( -sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder, + isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, typeName, size, md) Review Comment: Comments addressed, please take a look again when you are available -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]
yaooqinn commented on code in PR #45418: URL: https://github.com/apache/spark/pull/45418#discussion_r1517155629 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala: ## @@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with Logging { */ def canHandle(url : String): Boolean + @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0") + def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + /** * Get the custom datatype mapping for the given jdbc meta information. * @param sqlType The sql type (see java.sql.Types) * @param typeName The sql type name (e.g. "BIGINT UNSIGNED") * @param size The size of the type. * @param md Result metadata associated with this type. + * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not. * @return The actual DataType (subclasses of [[org.apache.spark.sql.types.DataType]]) * or null if the default type mapping should be used. */ def getCatalystType( -sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder, + isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, typeName, size, md) Review Comment: Indeed, the precision is in the method parameters, and the scale is in the meta:) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]
HyukjinKwon commented on PR #45360: URL: https://github.com/apache/spark/pull/45360#issuecomment-1984994353 Is this good to go? @HeartSaVioR @rangadi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47265][SQL][TESTS] Replace `createTable(..., schema: StructType, ...)` with `createTable(..., columns: Array[Column], ...)` in UT [spark]
LuciferYang commented on code in PR #45368: URL: https://github.com/apache/spark/pull/45368#discussion_r1517123366 ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala: ## @@ -84,28 +85,28 @@ class BasicInMemoryTableCatalog extends TableCatalog { invalidatedTables.add(ident) } - // TODO: remove it when no tests calling this deprecated method. + // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` Review Comment: I suggest adding some new comments to explain the reason for retaining this API and throwing exceptions by default while removing the `TODO` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47265][SQL][TESTS] Replace `createTable(..., schema: StructType, ...)` with `createTable(..., columns: Array[Column], ...)` in UT [spark]
cloud-fan commented on code in PR #45368: URL: https://github.com/apache/spark/pull/45368#discussion_r1517121687 ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala: ## @@ -84,28 +85,28 @@ class BasicInMemoryTableCatalog extends TableCatalog { invalidatedTables.add(ident) } - // TODO: remove it when no tests calling this deprecated method. + // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` Review Comment: I think we can remove the TODO now. We should never remove the API and implementation will have to provide a fake implementation that just fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]
cloud-fan commented on code in PR #45418: URL: https://github.com/apache/spark/pull/45418#discussion_r1517121075 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala: ## @@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with Logging { */ def canHandle(url : String): Boolean + @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0") + def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + /** * Get the custom datatype mapping for the given jdbc meta information. * @param sqlType The sql type (see java.sql.Types) * @param typeName The sql type name (e.g. "BIGINT UNSIGNED") * @param size The size of the type. * @param md Result metadata associated with this type. + * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not. * @return The actual DataType (subclasses of [[org.apache.spark.sql.types.DataType]]) * or null if the default type mapping should be used. */ def getCatalystType( -sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder, + isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, typeName, size, md) Review Comment: I feel this is not a good API design. Are we going to keep deprecating and adding new overloads with extra parameters every time we want to pass more information? AFAIK we are already using `MetadataBuilder` to carry decimal precision information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
cloud-fan commented on code in PR #45424: URL: https://github.com/apache/spark/pull/45424#discussion_r1517119767 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala: ## @@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: mutable.LinkedHashSet[Attribute * in `other`. */ def --(other: Iterable[NamedExpression]): AttributeSet = { Review Comment: This can be more efficient, but looks weird in standard collection APIs such as `def --` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on code in PR #45232: URL: https://github.com/apache/spark/pull/45232#discussion_r1517085186 ## python/pyspark/resource/tests/test_connect_resources.py: ## @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests +from pyspark.sql import SparkSession + + +class ResourceProfileTests(unittest.TestCase): +def test_profile_before_sc_for_connect(self): +rpb = ResourceProfileBuilder() +treqs = TaskResourceRequests().cpus(2) +# no exception for building ResourceProfile +rp = rpb.require(treqs).build Review Comment: Yes, Added the checking in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on code in PR #45232: URL: https://github.com/apache/spark/pull/45232#discussion_r1517085041 ## python/pyspark/resource/tests/test_connect_resources.py: ## @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests +from pyspark.sql import SparkSession + + +class ResourceProfileTests(unittest.TestCase): +def test_profile_before_sc_for_connect(self): Review Comment: Added the error checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on code in PR #45232: URL: https://github.com/apache/spark/pull/45232#discussion_r1517084819 ## python/pyspark/resource/profile.py: ## @@ -114,14 +122,23 @@ def id(self) -> int: int A unique id of this :class:`ResourceProfile` """ +with self._lock: +if self._id is None: +if self._java_resource_profile is not None: +self._id = self._java_resource_profile.id() +else: +from pyspark.sql import is_remote -if self._java_resource_profile is not None: -return self._java_resource_profile.id() -else: -raise RuntimeError( -"SparkContext must be created to get the id, get the id " -"after adding the ResourceProfile to an RDD" -) +if is_remote(): +from pyspark.sql.connect.resource.profile import ResourceProfile + +rp = ResourceProfile( Review Comment: Done to add a comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on code in PR #45232: URL: https://github.com/apache/spark/pull/45232#discussion_r1517084721 ## python/pyspark/resource/profile.py: ## @@ -114,14 +122,23 @@ def id(self) -> int: int A unique id of this :class:`ResourceProfile` """ +with self._lock: +if self._id is None: +if self._java_resource_profile is not None: +self._id = self._java_resource_profile.id() +else: +from pyspark.sql import is_remote -if self._java_resource_profile is not None: -return self._java_resource_profile.id() -else: -raise RuntimeError( -"SparkContext must be created to get the id, get the id " -"after adding the ResourceProfile to an RDD" -) +if is_remote(): +from pyspark.sql.connect.resource.profile import ResourceProfile + +rp = ResourceProfile( Review Comment: Yes, You're right, for the remote mode, the ResourceProfile is just like the proxy of the Spark ResourceProfile on the server side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]
yaooqinn commented on PR #45408: URL: https://github.com/apache/spark/pull/45408#issuecomment-1984930529 As the Spark Community didn't get any issue report during v3.3.0 - v3.5.1 releases, I think this is a corner case. Maybe we can make the config internal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]
dongjoon-hyun commented on PR #45408: URL: https://github.com/apache/spark/pull/45408#issuecomment-1984929745 +1 for the direction if we need to support both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]
yaooqinn commented on PR #45408: URL: https://github.com/apache/spark/pull/45408#issuecomment-1984926315 Thank you @dongjoon-hyun. In such circumstances, I guess we can add a configuration for base64 classes to avoid breaking things again. AFAIK, Apache Hive also uses the JDK version, and I think the majority of Spark users talk to Hive heavily using Spark SQL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47314][DOC] Remove the wrong comment line of `ExternalSorter#writePartitionedMapOutput` method [spark]
yaooqinn commented on PR #45415: URL: https://github.com/apache/spark/pull/45415#issuecomment-1984919818 Thanks @zwangsheng, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47314][DOC] Remove the wrong comment line of `ExternalSorter#writePartitionedMapOutput` method [spark]
yaooqinn closed pull request #45415: [SPARK-47314][DOC] Remove the wrong comment line of `ExternalSorter#writePartitionedMapOutput` method URL: https://github.com/apache/spark/pull/45415 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]
yaooqinn commented on PR #45427: URL: https://github.com/apache/spark/pull/45427#issuecomment-1984918192 Late +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]
HyukjinKwon closed pull request #45427: [MINOR][INFRA] Make "y/n" consistent within merge script URL: https://github.com/apache/spark/pull/45427 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]
HyukjinKwon commented on PR #45427: URL: https://github.com/apache/spark/pull/45427#issuecomment-1984911418 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47314][DOC] Correct the `ExternalSorter#writePartitionedMapOutput` method comment [spark]
zwangsheng commented on code in PR #45415: URL: https://github.com/apache/spark/pull/45415#discussion_r1517066704 ## core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala: ## @@ -690,7 +690,7 @@ private[spark] class ExternalSorter[K, V, C]( * Write all the data added into this ExternalSorter into a map output writer that pushes bytes * to some arbitrary backing store. This is called by the SortShuffleWriter. * - * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) + * Update the partition's length, when call partition pair writer to close. Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]
HyukjinKwon opened a new pull request, #45427: URL: https://github.com/apache/spark/pull/45427 ### What changes were proposed in this pull request? This PR changes the y/n message and condition consistent within merging script. ### Why are the changes needed? For consistency. ``` Would you like to use the modified body? (y/N): y ... Proceed with merging pull request #45426? (y/N): y ... Merge complete (local ref PR_TOOL_MERGE_PR_45426_MASTER). Push to apache? (y/N): y ... Would you like to pick 9cac2bb6 into another branch? (y/N): n ... Would you like to update an associated JIRA? (y/N): y ... Check if the JIRA information is as expected (Y/n): y # <-- Inconsistent. ``` ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46992]Fix cache consistence [spark]
doki23 commented on PR #45181: URL: https://github.com/apache/spark/pull/45181#issuecomment-1984850287 > All children have to be considered for changes of their persistence state. Currently it only checks the fist found child. For clarity there is a test which fails: [doki23#1](https://github.com/doki23/spark/pull/1) So, we need a cache state signature for queryExecution -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]
HyukjinKwon closed pull request #45426: [SPARK-47309][SQL][XML] Fix schema inference issues in XML URL: https://github.com/apache/spark/pull/45426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]
HyukjinKwon commented on PR #45426: URL: https://github.com/apache/spark/pull/45426#issuecomment-1984850009 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47078][DOCS][PYTHON] Documentation for SparkSession-based Profilers [spark]
HyukjinKwon closed pull request #45269: [SPARK-47078][DOCS][PYTHON] Documentation for SparkSession-based Profilers URL: https://github.com/apache/spark/pull/45269 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47078][DOCS][PYTHON] Documentation for SparkSession-based Profilers [spark]
HyukjinKwon commented on PR #45269: URL: https://github.com/apache/spark/pull/45269#issuecomment-1984848824 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]
HyukjinKwon commented on code in PR #45422: URL: https://github.com/apache/spark/pull/45422#discussion_r1517022572 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types.{DataType, StringType} + +object CollationUtils { + def checkCollationCompatibility( + superCheck: => TypeCheckResult, + collationId: Int, + rightDataType: DataType + ): TypeCheckResult = { +val checkResult = superCheck +if (checkResult.isFailure) return checkResult +// Additional check needed for collation compatibility +val rightCollationId: Int = rightDataType.asInstanceOf[StringType].collationId +if (collationId != rightCollationId) { + return DataTypeMismatch( +errorSubClass = "COLLATION_MISMATCH", +messageParameters = Map( + "collationNameLeft" -> CollationFactory.fetchCollation(collationId).collationName, + "collationNameRight" -> CollationFactory.fetchCollation(rightCollationId).collationName +) + ) +} +TypeCheckResult.TypeCheckSuccess + } + + final val SUPPORT_BINARY_ONLY: Int = 0 + final val SUPPORT_LOWERCASE: Int = 1 + final val SUPPORT_ALL_COLLATIONS: Int = 2 + + def checkCollationSupport( Review Comment: Let's keep the indentation propery with 2/4 spaces (https://github.com/databricks/scala-style-guide) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Miland db/miland legacy error class [spark]
HyukjinKwon commented on PR #45423: URL: https://github.com/apache/spark/pull/45423#issuecomment-1984835207 See also https://spark.apache.org/contributing.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Miland db/miland legacy error class [spark]
HyukjinKwon commented on PR #45423: URL: https://github.com/apache/spark/pull/45423#issuecomment-1984834978 Mind filing a JIRA and linking it to the PR title please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]
github-actions[bot] closed pull request #42398: [SPARK-42746][SQL] Add the LISTAGG() aggregate function URL: https://github.com/apache/spark/pull/42398 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46034][CORE] SparkContext add file should also copy file to local root path [spark]
github-actions[bot] commented on PR #43936: URL: https://github.com/apache/spark/pull/43936#issuecomment-1984826472 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46071][SQL] Optimize CaseWhen toJSON content [spark]
github-actions[bot] commented on PR #43979: URL: https://github.com/apache/spark/pull/43979#issuecomment-1984826451 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]
shujingyang-db opened a new pull request, #45426: URL: https://github.com/apache/spark/pull/45426 ### What changes were proposed in this pull request? This PR fixes XML schema inference issues: 1. when there's an empty tag 2. when merging schema for NullType ### Why are the changes needed? Fix a bug ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]
xinrong-meng commented on PR #45378: URL: https://github.com/apache/spark/pull/45378#issuecomment-1984523232 Merged to master, thank you all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]
xinrong-meng closed pull request #45378: [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling URL: https://github.com/apache/spark/pull/45378 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]
dongjoon-hyun commented on PR #45408: URL: https://github.com/apache/spark/pull/45408#issuecomment-1984433848 Thank you for the confirmation, @ted-jenks . Well, in this case, it's too late to change the behavior again. Apache Spark 3.3 is already the EOL status since last year and I don't think we need to change the behavior for Apache Spark 3.4.3 and 3.5.2 because Apache Spark community didn't have such an official contract before. It would be great if you participate the community at Apache Spark 3.3.0 RC votes at that time. > > It sounds like you have other systems to read Spark's data. > > Correct. The issue was that from 3.2 to 3.3 there was a behavior change in the base64 encodings used in spark. Previously, they did not chunk. Now, they do. Chunked base64 cannot be read by non-MIME compatible base64 decoders causing the data output by Spark to be corrupt to systems following the normal base64 standard. > > I think the best path forward is to use MIME encoding/decoding without chunking as this is the most fault tolerant meaning existing use-cases will not break, but the pre 3.3 base64 behavior is upheld. However, I understand and agree with @ted-jenks 's point as a nice-to-have of Apache Spark 4+ officially. In other words, if we want to merge this PR, we need to make it official from Apache Spark 4.0.0 and protect that as a kind of developer interface for all future releases. Do you think it's okay, @ted-jenks ? BTW, how do you think about this proposal, @yaooqinn (the original author of #35110) and @cloud-fan and @HyukjinKwon ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47311][SQL][PYTHON] Suppress Python exceptions where PySpark is not in the Python path [spark]
xinrong-meng commented on PR #45414: URL: https://github.com/apache/spark/pull/45414#issuecomment-1984375769 Looks nice, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1516770647 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: Why is it O(n^2)? The ReplaceWithExpression would replace all WITH expressions in one pass when we first call it, would not it? (I know that technically it is also called as part of optimization loop in OptimizeSubqueries, but by that point all the WITH expressions are replaced). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]
xinrong-meng commented on code in PR #45378: URL: https://github.com/apache/spark/pull/45378#discussion_r1516752307 ## python/pyspark/sql/tests/test_session.py: ## @@ -531,6 +531,33 @@ def test_dump_invalid_type(self): }, ) +def test_clear_memory_type(self): Review Comment: Good idea! For now, all logic tested by SparkSessionProfileTests is directly imported in Spark Connect with no modification. But I do agree separating it later will improve readability and ensure future parity. I'll refactor later. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]
ueshin commented on code in PR #45378: URL: https://github.com/apache/spark/pull/45378#discussion_r1516750441 ## python/pyspark/sql/profiler.py: ## @@ -224,6 +224,54 @@ def dump(id: int) -> None: for id in sorted(code_map.keys()): dump(id) +def clear_perf_profiles(self, id: Optional[int] = None) -> None: +""" +Clear the perf profile results. + +.. versionadded:: 4.0.0 Review Comment: Actually this is not. The `clear` in `Profile` should be a user-facing API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]
xinrong-meng commented on code in PR #45378: URL: https://github.com/apache/spark/pull/45378#discussion_r1516752307 ## python/pyspark/sql/tests/test_session.py: ## @@ -531,6 +531,33 @@ def test_dump_invalid_type(self): }, ) +def test_clear_memory_type(self): Review Comment: Good idea! For now, all logic tested by SparkSessionProfileTests is directly imported in Spark Connect with no modification. But I do agree separating it later will improve readability. I'll refactor later. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47318][Security] Adds HKDF round to AuthEngine key derivation [spark]
sweisdb opened a new pull request, #45425: URL: https://github.com/apache/spark/pull/45425 ### What changes were proposed in this pull request? This change adds an additional pass through a key derivation function (KDF) to the key exchange protocol in `AuthEngine`. Currently, it uses the shared secret from a bespoke key negotiation protocol directly. This is an encoded X coordinate on the X25519 curve. It is atypical and not recommended to use that coordinate directly as a key, but rather to pass it to an KDF. Note, Spark now supports TLS for RPC calls. It is preferable to use that rather than the bespoke AES RPC encryption implemented by `AuthEngine` and `TransportCipher`. ### Why are the changes needed? This follows best practices of key negotiation protocols. The encoded X coordinate is not guaranteed to be uniformly distributed over the 32-byte key space. Rather, we pass it through a HKDF function to map it uniformly to a 16-byte key space. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Exiting tests under: `build/sbt "network-common/test:testOnly"` Specifically: `build/sbt "network-common/test:testOnly org.apache.spark.network.crypto.AuthEngineSuite"` `build/sbt "network-common/test:testOnly org.apache.spark.network.crypto.AuthIntegrationSuite"` ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
attilapiros commented on code in PR #45424: URL: https://github.com/apache/spark/pull/45424#discussion_r1516669562 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala: ## @@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: mutable.LinkedHashSet[Attribute * in `other`. */ def --(other: Iterable[NamedExpression]): AttributeSet = { Review Comment: and then we can save here what in the `missingInput()` was saved in your previous commit (the calculation of the `inputSet`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
attilapiros commented on code in PR #45424: URL: https://github.com/apache/spark/pull/45424#discussion_r1516651884 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala: ## @@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: mutable.LinkedHashSet[Attribute * in `other`. */ def --(other: Iterable[NamedExpression]): AttributeSet = { Review Comment: @peter-toth What about changing the `other` to a call-by-name parameter? ```suggestion def --(other: => Iterable[NamedExpression]): AttributeSet = { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
peter-toth commented on PR #45424: URL: https://github.com/apache/spark/pull/45424#issuecomment-1984153122 @cloud-fan can you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]
attilapiros commented on PR #45424: URL: https://github.com/apache/spark/pull/45424#issuecomment-1984150861 LGTM I talked to @peter-toth offline and the improvement comes from not calculating the `inputSet` at all when references is empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1516609093 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) Review Comment: Do we need to set this config? `workerFactory.createSimpleWorker(blockingMode = true)` this is already not using the daemon mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-37932][SQL]Wait to resolve missing attributes before applying DeduplicateRelations [spark]
peter-toth commented on PR #35684: URL: https://github.com/apache/spark/pull/35684#issuecomment-1984107426 @martinf-moodys, [SPARK-47319](https://issues.apache.org/jira/browse/SPARK-47319) / https://github.com/apache/spark/pull/45424 might help, especially if you have many `Union` nodes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47319][SQL] Fix missingInput calculation [spark]
peter-toth opened a new pull request, #45424: URL: https://github.com/apache/spark/pull/45424 ### What changes were proposed in this pull request? This PR speeds up `QueryPlan.missingInput()` calculation. ### Why are the changes needed? This seems to be the root cause of `DeduplicateRelations` slowness in some cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
stefankandic commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516535896 ## sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala: ## @@ -117,7 +117,7 @@ object DataType { private val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r private val CHAR_TYPE = """char\(\s*(\d+)\s*\)""".r private val VARCHAR_TYPE = """varchar\(\s*(\d+)\s*\)""".r - private val COLLATED_STRING_TYPE = """string\s+COLLATE\s+'([\w_]+)'""".r + private val COLLATED_STRING_TYPE = """string\s+COLLATE\s+([\w_]+)""".r Review Comment: Nice catch! Since `collate_key_word_as_identifier` is false I guess we only need to support backticks. I added the tests for them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
sahnib commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1516471532 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: Review Comment: Should we make this abstract to force user to implement it? ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + // When the python process for python_streaming_source_runner receives one of the + // integers below, it will invoke the corresponding function of StreamReader instance. + val INITIAL_OFFSET_FUNC_ID = 884 + val LATEST_OFFSET_FUNC_ID = 885 + val PARTITIONS_FUNC_ID = 886 + val COMMIT_FUNC_ID = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + private val bufferSize: Int = conf.get(BUFFER_SIZE) + private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + private val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStr
Re: [PR] Miland db/miland legacy error class [spark]
miland-db closed pull request #45423: Miland db/miland legacy error class URL: https://github.com/apache/spark/pull/45423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Miland db/miland legacy error class [spark]
miland-db opened a new pull request, #45423: URL: https://github.com/apache/spark/pull/45423 ### What changes were proposed in this pull request? In the PR, I propose to assign the proper names to the legacy error classes _LEGACY_ERROR_TEMP_324[7-9], and modify tests in testing suites to reflect these changes and use checkError() function. Also this PR improves the error messages. ### Why are the changes needed? Proper name improves user experience w/ Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, the PR changes an user-facing error message. ### How was this patch tested? Tests already exist. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]
dbatomic commented on code in PR #45422: URL: https://github.com/apache/spark/pull/45422#discussion_r1516510742 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types.{DataType, StringType} + +object CollationUtils { Review Comment: I usually recommend to stay away from *Utils naming, since the name is not descriptive. Can this file be called `CollationTypeConstraints`? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types.{DataType, StringType} + +object CollationUtils { + def checkCollationCompatibility( + superCheck: => TypeCheckResult, Review Comment: Why do you need superCheck here? Can you just bail out before calling this check if super check is failure? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.types.{DataType, StringType} + +object CollationUtils { + def checkCollationCompatibility( + superCheck: => TypeCheckResult, + collationId: Int, + rightDataType: DataType + ): TypeCheckResult = { +val checkResult = superCheck +if (checkResult.isFailure) return checkResult +// Additional check needed for collation compatibility +val rightCollationId: Int = rightDataType.asInstanceOf[St
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
jchen5 commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1516503722 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: > Thinking about it more, why do we handle the count bug in two places that are far away? I wrote a doc about the reason for it and whether we can change it: https://docs.google.com/document/d/1YCce0DtJ6NkMP1QM1nnfYvkiH1CkHoGyMjKQ8MX5bUM/edit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45827][SQL] Move data type checks to CreatableRelationProvider [spark]
cloud-fan closed pull request #45409: [SPARK-45827][SQL] Move data type checks to CreatableRelationProvider URL: https://github.com/apache/spark/pull/45409 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45827][SQL] Move data type checks to CreatableRelationProvider [spark]
cloud-fan commented on PR #45409: URL: https://github.com/apache/spark/pull/45409#issuecomment-1983973892 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]
uros-db opened a new pull request, #45422: URL: https://github.com/apache/spark/pull/45422 ### What changes were proposed in this pull request? ### Why are the changes needed? Currently, all `StringType` arguments passed to built-in string functions in Spark SQL get treated as binary strings. This behaviour is incorrect for almost all collationIds except the default (0), and we should instead warn the user if they try to use an unsupported collation for the given function. Over time, we should implement the appropriate support for these (function, collation) pairs, but until then - we should have a way to fail unsupported statements in query analysis. ### Does this PR introduce _any_ user-facing change? Yes, users will now get appropriate errors when they try to use an unsupported collation with a given string function. ### How was this patch tested? Tests in CollationSuite to check if these functions work for binary collations and throw exceptions for others. ### Was this patch authored or co-authored using generative AI tooling? Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
MaxGekk commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516378011 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1096,7 +1096,7 @@ colPosition ; collateClause -: COLLATE collationName=stringLit +: COLLATE collationName=identifier Review Comment: BTW, I would improve the error message to: ``` [COLLATION_INVALID_NAME] `test_collation` does not represent a correct collation name. Suggested valid collation name: UCS_BASIC. SQLSTATE: 42704 ``` and attach a query context like in: ``` spark-sql (default)> select 'aaa' from test-table; [INVALID_IDENTIFIER] The identifier test-table is invalid. Please, consider quoting it with back-quotes as `test-table`. SQLSTATE: 42602 (line 1, pos 22) == SQL == select 'aaa' from test-table --^^^ ``` @dbatomic Could you open an JIRA for that, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
MaxGekk commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516415830 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1096,7 +1096,7 @@ colPosition ; collateClause -: COLLATE collationName=stringLit +: COLLATE collationName=identifier Review Comment: I agree, it would be better to report about incompatible types rather invalid identifier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
stefankandic commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516396458 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1096,7 +1096,7 @@ colPosition ; collateClause -: COLLATE collationName=stringLit +: COLLATE collationName=identifier Review Comment: what about this case: ```code select 'a' collate UNICODE-columnB ``` shouldn't we report that the types are incompatible for minus operation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]
yaooqinn commented on code in PR #45418: URL: https://github.com/apache/spark/pull/45418#discussion_r1516398411 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala: ## @@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with Logging { */ def canHandle(url : String): Boolean + @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0") + def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + /** * Get the custom datatype mapping for the given jdbc meta information. * @param sqlType The sql type (see java.sql.Types) * @param typeName The sql type name (e.g. "BIGINT UNSIGNED") * @param size The size of the type. * @param md Result metadata associated with this type. + * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not. * @return The actual DataType (subclasses of [[org.apache.spark.sql.types.DataType]]) * or null if the default type mapping should be used. */ def getCatalystType( -sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder, + isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, typeName, size, md) Review Comment: It is not a breaking change. It has a default implementation which fallbacks to the original method. Carrying in meta works but is not obvious for third-party developers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
stefankandic commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516396458 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1096,7 +1096,7 @@ colPosition ; collateClause -: COLLATE collationName=stringLit +: COLLATE collationName=identifier Review Comment: what about this case: ```code select 'a' collate UNICODE-columnB ``` then the error would be: ```code [INVALID_IDENTIFIER] The identifier UNICODE-columnB is invalid. Please, consider quoting it with back-quotes as `UNICODE-columnB`. SQLSTATE: 42602 ``` instead of giving the error that types are incompatible for minus operation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1516389365 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) { } public boolean startsWith(final UTF8String prefix, int collationId) { -if (CollationFactory.fetchCollation(collationId).isBinaryCollation) { +if (collationId == CollationFactory.DEFAULT_COLLATION_ID) { return this.startsWith(prefix); } if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) { return this.toLowerCase().startsWith(prefix.toLowerCase()); } -return matchAt(prefix, 0, collationId); +if (prefix.numBytes == 0 || this.numBytes == 0) return prefix.numBytes==0; + +String prefixString = StandardCharsets.UTF_8.decode(prefix.getByteBuffer()).toString(), +patternString = StandardCharsets.UTF_8.decode(this.getByteBuffer()).toString(); + +StringSearch search = new StringSearch(prefixString, Review Comment: referring to this PR: https://github.com/apache/spark/pull/45382 you can see that we shouldn't instantiate StringSearch objects outside of CollationFactory as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1516391257 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -31,6 +32,8 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.ibm.icu.text.RuleBasedCollator; +import com.ibm.icu.text.StringSearch; Review Comment: (see comment below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1516381847 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) { } public boolean startsWith(final UTF8String prefix, int collationId) { -if (CollationFactory.fetchCollation(collationId).isBinaryCollation) { +if (collationId == CollationFactory.DEFAULT_COLLATION_ID) { return this.startsWith(prefix); } if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) { return this.toLowerCase().startsWith(prefix.toLowerCase()); } -return matchAt(prefix, 0, collationId); +if (prefix.numBytes == 0 || this.numBytes == 0) return prefix.numBytes==0; + +String prefixString = StandardCharsets.UTF_8.decode(prefix.getByteBuffer()).toString(), +patternString = StandardCharsets.UTF_8.decode(this.getByteBuffer()).toString(); + +StringSearch search = new StringSearch(prefixString, +new StringCharacterIterator(patternString), +(RuleBasedCollator) CollationFactory.fetchCollation(collationId).collator +); + +return search.first()==0; } public boolean endsWith(final UTF8String suffix) { return matchAt(suffix, numBytes - suffix.numBytes); } public boolean endsWith(final UTF8String suffix, int collationId) { -if (CollationFactory.fetchCollation(collationId).isBinaryCollation) { +if (collationId == CollationFactory.DEFAULT_COLLATION_ID) { Review Comment: (see comment above) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1516380909 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) { } public boolean startsWith(final UTF8String prefix, int collationId) { -if (CollationFactory.fetchCollation(collationId).isBinaryCollation) { +if (collationId == CollationFactory.DEFAULT_COLLATION_ID) { Review Comment: we shouldn't make changes to the original logic, this part was fine in this PR, we should only focus on the "third" type of collations (non-binary, non-lowercase) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]
uros-db commented on code in PR #45421: URL: https://github.com/apache/spark/pull/45421#discussion_r1516379232 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -31,6 +32,8 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.ibm.icu.text.RuleBasedCollator; Review Comment: we should not expose collator outside of `CollationFactory` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]
MaxGekk commented on code in PR #45405: URL: https://github.com/apache/spark/pull/45405#discussion_r1516378011 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1096,7 +1096,7 @@ colPosition ; collateClause -: COLLATE collationName=stringLit +: COLLATE collationName=identifier Review Comment: BTW, I would improve the error message to: ``` [COLLATION_INVALID_NAME] `test_collation` does not represent a correct collation name. Suggested valid collation name: UCS_BASIC. SQLSTATE: 42704 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org