Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356903


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+dataOut.flush()
+
+dataIn = new 

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517356072


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def latestOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "latestOffset"},
+)
+
+def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
+"""
+Returns a list of InputPartition  given the start and end offsets. 
Each InputPartition
+represents a data split that can be processed by one Spark task.
+
+Parameters
+--
+start : dict
+The start offset of the microbatch to plan partitioning.
+end : dict
+The end offset of the microbatch to plan partitioning.
+
+Returns
+---
+Sequence[InputPartition]
+A sequence of partitions for this data source. Each partition value
+must be an instance of `InputPartition` or a subclass of it.
+"""
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "partitions"},
+)
+
+@abstractmethod
+def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]:

Review Comment:
   I copy the interface from batch python data source.  Why is this 
Iterator[Union] instead of Union[Iterator]? @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517351498


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {

Review Comment:
   Could either @rangadi or @WweiL take a look? This is somewhat similar with 
foreachBatch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517348253


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   IDE seems not catching this case. Things are runtime as Jungtaek said so 
it'd be difficult to statically prevent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] Issue to fix foreachbatch persist issue for stateful queries [spark]

2024-03-07 Thread via GitHub


anishshri-db opened a new pull request, #45432:
URL: https://github.com/apache/spark/pull/45432

   ### What changes were proposed in this pull request?
   Issue to fix foreachbatch persist issue for stateful queries
   
   
   ### Why are the changes needed?
   This allows us to prevent stateful operators from reloading state on every 
foreachbatch invocation
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added unit tests
   
   ```
   [info] Run completed in 20 seconds, 898 milliseconds.
   [info] Total number of tests run: 11
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45290:
URL: https://github.com/apache/spark/pull/45290#discussion_r1517309692


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase {
 }
   }
 
+  test("aggregates count respects collation") {
+Seq(
+  ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))),

Review Comment:
   cc @dongjoon-hyun too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` duplicated column name handling consisten with `withColumnRenamed` [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45431:
URL: https://github.com/apache/spark/pull/45431#issuecomment-1985167614

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` duplicated column name handling consisten with `withColumnRenamed` [spark]

2024-03-07 Thread via GitHub


zhengruifeng opened a new pull request, #45431:
URL: https://github.com/apache/spark/pull/45431

   ### What changes were proposed in this pull request?
   Make `withColumnsRenamed` duplicated column name handling consistent with 
`withColumnRenamed`
   
   
   ### Why are the changes needed?
   `withColumnsRenamed` checks the column names duplication of output 
dataframe, this is not consistent with `withColumnRenamed`:
   1, `withColumnRenamed` doesn't do this check, and support output a dataframe 
with duplicated column names;
   2, when the input dataframe has duplicated column names, 
`withColumnsRenamed` always fail, even if the columns with the same name are 
not touched at all:
   
   
   ```
   In [8]: df1 = spark.createDataFrame([(1, "id2"),], ["id", "value"])
  ...: df2 = spark.createDataFrame([(1, 'x', 'id1'), ], ["id", 'a', 
"value"])
  ...: join = df2.join(df1, on=['id'], how='left')
  ...: join
   Out[8]: DataFrame[id: bigint, a: string, value: string, value: string]
   
   In [9]: join.withColumnRenamed('id', 'value')
   Out[9]: DataFrame[value: bigint, a: string, value: string, value: string]
   
   In [10]: join.withColumnsRenamed({'id' : 'value'})
   ...
   AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
   
   In [11]: join.withColumnRenamed('a', 'b')
   Out[11]: DataFrame[id: bigint, b: string, value: string, value: string]
   
   In [12]: join.withColumnsRenamed({'a' : 'b'})
   ...
   AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
   
   In [13]: join.withColumnRenamed('x', 'y')
   Out[13]: DataFrame[id: bigint, a: string, value: string, value: string]
   
   In [14]: join.withColumnsRenamed({'x' : 'y'})
   AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
   
   In [15]: join.withColumnRenamed('value', 'new_value')
   Out[15]: DataFrame[id: bigint, a: string, new_value: string, new_value: 
string]
   
   In [16]: join.withColumnsRenamed({'value' : 'new_value'})
   AnalysisException: [COLUMN_ALREADY_EXISTS] The column `new_value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   yes
   
   
   
   
   ### How was this patch tested?
   updated tests
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on PR #45428:
URL: https://github.com/apache/spark/pull/45428#issuecomment-1985161811

   FOLLOWUP tag should be OK. Thanks for handling this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]

2024-03-07 Thread via GitHub


HyukjinKwon closed pull request #45429: [SPARK-47079][PYTHON][DOCS][FOLLOWUP] 
Add `VariantType` to API references
URL: https://github.com/apache/spark/pull/45429


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45429:
URL: https://github.com/apache/spark/pull/45429#issuecomment-1985159258

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1985157244

   Will review sooner than later. Maybe by today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1517217945


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests to verify operations based on stateful processor 
handle
+ * used primarily in queries based on the `transformWithState` operator.
+ */
+class StatefulProcessorHandleSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+StateStore.stop()
+require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+StateStore.stop()
+require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def keyExprEncoder: ExpressionEncoder[Any] =
+Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
+
+  private def newStoreProviderWithHandle(useColumnFamilies: Boolean):
+RocksDBStateStoreProvider = {
+newStoreProviderWithHandle(StateStoreId(newDir(), Random.nextInt(), 0),
+  numColsPrefixKey = 0,
+  useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithHandle(
+storeId: StateStoreId,
+numColsPrefixKey: Int,
+sqlConf: Option[SQLConf] = None,
+conf: Configuration = new Configuration,
+useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+val provider = new RocksDBStateStoreProvider()
+provider.init(
+  storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+  useColumnFamilies,
+  new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+provider
+  }
+
+  private def tryWithProviderResource[T](
+provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+try {
+  f(provider)
+} finally {
+  provider.close()
+}
+  }
+
+  private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
+timeoutMode match {
+  case "NoTimeouts" => TimeoutMode.NoTimeouts()
+  case "ProcessingTime" => TimeoutMode.ProcessingTime()
+  case "EventTime" => TimeoutMode.EventTime()
+  case _ => throw new IllegalArgumentException(s"Invalid 
timeoutMode=$timeoutMode")
+}
+  }
+
+  Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
+test(s"value state creation with timeoutMode=$timeoutMode should succeed") 
{
+  tryWithProviderResource(newStoreProviderWithHandle(true)) { provider =>
+val store = provider.getStore(0)
+val handle = new StatefulProcessorHandleImpl(store,
+  UUID.randomUUID(), keyExprEncoder, getTimeoutMode(timeoutMode))
+assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
+handle.getValueState[Long]("testState")
+  }
+}
+  }
+
+  private def verifyInvalidOperation(
+  handle: StatefulProcessorHandleImpl,
+  handleState: StatefulProcessorHandleState.Value,
+  errorMsg: String)(fn: StatefulProcessorHandleImpl => Unit): Unit = {
+handle.setHandleState(handleState)
+assert(handle.getHandleState === handleState)
+val ex = intercept[Exception] {
+  fn(handle)
+}
+assert(ex.getMessage.contains(errorMsg))
+  }
+
+  private def createValueStateInstance(handle: StatefulProcessorHandleImpl): 
Unit = {
+handle.getValueState[Long]("testState")
+  }
+
+  private def registerTimer(handle: 

Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on code in PR #45290:
URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase {
 }
   }
 
+  test("aggregates count respects collation") {
+Seq(
+  ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))),

Review Comment:
   This test case failed in the daily test of Maven + Java 21, both on Linux 
and MacOS. @dbatomic Do you have time to investigate the cause of this failure?
   
   - linux: 
https://github.com/apache/spark/actions/runs/8189363924/job/22416511124
   
   ```
   - aggregates count respects collation *** FAILED ***
 Exception thrown while executing query:
 == Parsed Logical Plan ==
 CTE [t]
 :  +- 'SubqueryAlias t
 : +- 'Project ['collate('col1, unicode_CI) AS c#427560]
 :+- 'UnresolvedInlineTable [col1], [[AAA], [aaa]]
 +- 'Aggregate ['c], [unresolvedalias('COUNT(1)), 'c]
+- 'UnresolvedRelation [t], [], false
 
 == Analyzed Logical Plan ==
 count(1): bigint, c: string COLLATE 'UNICODE_CI'
 WithCTE
 :- CTERelationDef 106, false
 :  +- SubqueryAlias t
 : +- Project [collate(col1#427562, unicode_CI) AS c#427560]
 :+- LocalRelation [col1#427562]
 +- Aggregate [c#427560], [count(1) AS count(1)#427563L, c#427560]
+- SubqueryAlias t
   +- CTERelationRef 106, true, [c#427560], false
 
 == Optimized Logical Plan ==
 Aggregate [c#427560], [count(1) AS count(1)#427563L, c#427560]
 +- LocalRelation [c#427560]
 
 == Physical Plan ==
 AdaptiveSparkPlan isFinalPlan=false
 +- == Current Plan ==
SortAggregate(key=[c#427560], functions=[count(1)], 
output=[count(1)#427563L, c#427560])
+- Sort [c#427560 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 0
  +- Exchange hashpartitioning(c#427560, 5), ENSURE_REQUIREMENTS, 
[plan_id=435997]
 +- SortAggregate(key=[c#427560], functions=[partial_count(1)], 
output=[c#427560, count#427567L])
+- *(1) Sort [c#427560 ASC NULLS FIRST], false, 0
   +- *(1) LocalTableScan [c#427560]
 +- == Initial Plan ==
SortAggregate(key=[c#427560], functions=[count(1)], 
output=[count(1)#427563L, c#427560])
+- Sort [c#427560 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c#427560, 5), ENSURE_REQUIREMENTS, 
[plan_id=435931]
  +- SortAggregate(key=[c#427560], functions=[partial_count(1)], 
output=[c#427560, count#427567L])
 +- Sort [c#427560 ASC NULLS FIRST], false, 0
+- LocalTableScan [c#427560]
 
 == Exception ==
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 
in stage 393.0 failed 1 times, most recent failure: Lost task 1.0 in stage 
393.0 (TID 394) (localhost executor driver): 
java.lang.StringIndexOutOfBoundsException: Index 3 out of bounds for length 3
at 
java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:55)
at 
java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:52)
at 
java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:213)
at 
java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:210)
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:98)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
at java.base/java.lang.String.checkIndex(String.java:4832)
at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:46)
at java.base/java.lang.String.charAt(String.java:1555)
at 
com.ibm.icu.impl.coll.UTF16CollationIterator.handleNextCE32(UTF16CollationIterator.java:107)
at 
com.ibm.icu.impl.coll.CollationIterator.nextCE(CollationIterator.java:247)
at 
com.ibm.icu.impl.coll.CollationKeys.writeSortKeyUpToQuaternary(CollationKeys.java:374)
at 
com.ibm.icu.text.RuleBasedCollator.writeSortKey(RuleBasedCollator.java:1159)
at 
com.ibm.icu.text.RuleBasedCollator.getRawCollationKey(RuleBasedCollator.java:1146)
at 
com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1071)
at 
com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1064)
at 
org.apache.spark.sql.catalyst.util.CollationFactory$Collation.lambda$new$2(CollationFactory.java:104)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$8(ShuffleExchangeExec.scala:330)
at 

Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on code in PR #45290:
URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase {
 }
   }
 
+  test("aggregates count respects collation") {
+Seq(
+  ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))),

Review Comment:
   This test case failed in the daily test of Maven + Java 21, both on Linux 
and MacOS. @dbatomic Do you have time to investigate the cause of this failure?
   
   - linux: 
https://github.com/apache/spark/actions/runs/8189363924/job/22416511124
   
   
![image](https://github.com/apache/spark/assets/1475305/2b39b2d2-a4b2-45ec-8bd9-42c36f22a0b9)
   
   
   - macos-14: 
https://github.com/apache/spark/actions/runs/8194082205/job/22416483724
   
   ```
   - aggregates count respects collation *** FAILED ***
 Exception thrown while executing query:
 == Parsed Logical Plan ==
 CTE [t]
 :  +- 'SubqueryAlias t
 : +- 'Project ['collate('col1, unicode_CI) AS c#427467]
 :+- 'UnresolvedInlineTable [col1], [[aaa], [aaa]]
 +- 'Aggregate ['c], [unresolvedalias('COUNT(1)), 'c]
+- 'UnresolvedRelation [t], [], false
 
 == Analyzed Logical Plan ==
 count(1): bigint, c: string COLLATE 'UNICODE_CI'
 WithCTE
 :- CTERelationDef 103, false
 :  +- SubqueryAlias t
 : +- Project [collate(col1#427469, unicode_CI) AS c#427467]
 :+- LocalRelation [col1#427469]
 +- Aggregate [c#427467], [count(1) AS count(1)#427470L, c#427467]
+- SubqueryAlias t
   +- CTERelationRef 103, true, [c#427467], false
 
 == Optimized Logical Plan ==
 Aggregate [c#427467], [count(1) AS count(1)#427470L, c#427467]
 +- LocalRelation [c#427467]
 
 == Physical Plan ==
 AdaptiveSparkPlan isFinalPlan=false
 +- == Current Plan ==
SortAggregate(key=[c#427467], functions=[count(1)], 
output=[count(1)#427470L, c#427467])
+- Sort [c#427467 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 0
  +- Exchange hashpartitioning(c#427467, 5), ENSURE_REQUIREMENTS, 
[plan_id=435580]
 +- SortAggregate(key=[c#427467], functions=[partial_count(1)], 
output=[c#427467, count#427474L])
+- *(1) Sort [c#427467 ASC NULLS FIRST], false, 0
   +- *(1) LocalTableScan [c#427467]
 +- == Initial Plan ==
SortAggregate(key=[c#427467], functions=[count(1)], 
output=[count(1)#427470L, c#427467])
+- Sort [c#427467 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c#427467, 5), ENSURE_REQUIREMENTS, 
[plan_id=435514]
  +- SortAggregate(key=[c#427467], functions=[partial_count(1)], 
output=[c#427467, count#427474L])
 +- Sort [c#427467 ASC NULLS FIRST], false, 0
+- LocalTableScan [c#427467]
 
 == Exception ==
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 387.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
387.0 (TID 387) (localhost executor driver): 
java.lang.StringIndexOutOfBoundsException: Index 4 out of bounds for length 3
at 
java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:55)
at 
java.base/jdk.internal.util.Preconditions$1.apply(Preconditions.java:52)
at 
java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:213)
at 
java.base/jdk.internal.util.Preconditions$4.apply(Preconditions.java:210)
at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:98)
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
at java.base/java.lang.String.checkIndex(String.java:4832)
at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:46)
at java.base/java.lang.String.charAt(String.java:1555)
at 
com.ibm.icu.impl.coll.UTF16CollationIterator.handleNextCE32(UTF16CollationIterator.java:107)
at 
com.ibm.icu.impl.coll.CollationIterator.nextCE(CollationIterator.java:247)
at 
com.ibm.icu.impl.coll.CollationKeys.writeSortKeyUpToQuaternary(CollationKeys.java:374)
at 
com.ibm.icu.text.RuleBasedCollator.writeSortKey(RuleBasedCollator.java:1159)
at 
com.ibm.icu.text.RuleBasedCollator.getRawCollationKey(RuleBasedCollator.java:1146)
at 
com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1071)
at 
com.ibm.icu.text.RuleBasedCollator.getCollationKey(RuleBasedCollator.java:1064)
at 
org.apache.spark.sql.catalyst.util.CollationFactory$Collation.lambda$new$2(CollationFactory.java:104)
at 

Re: [PR] [SPARK-46834][SQL][Collations] Support for aggregates [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on code in PR #45290:
URL: https://github.com/apache/spark/pull/45290#discussion_r1517243024


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -183,6 +185,57 @@ class CollationSuite extends DatasourceV2SQLBase {
 }
   }
 
+  test("aggregates count respects collation") {
+Seq(
+  ("ucs_basic", Seq("AAA", "aaa"), Seq(Row(1, "AAA"), Row(1, "aaa"))),

Review Comment:
   This test case failed in the daily test of Maven + Java 21, both on Linux 
and MacOS. @dbatomic Do you have time to investigate the cause of this failure?
   
   - linux: 
https://github.com/apache/spark/actions/runs/8189363924/job/22416511124
   
   
![image](https://github.com/apache/spark/assets/1475305/2b39b2d2-a4b2-45ec-8bd9-42c36f22a0b9)
   
   
   - macos-14: 
https://github.com/apache/spark/actions/runs/8194082205/job/22416483724
   
   
![image](https://github.com/apache/spark/assets/1475305/280c413b-71d6-4715-80a6-1f5ae27a5097)
   
   also cc @HyukjinKwon because I noticed that you are paying attention to the 
tests of maven + Java 21 on macos-14



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45422:
URL: https://github.com/apache/spark/pull/45422#discussion_r1517236308


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.{DataType, StringType}
+
+object CollationUtils {
+  def checkCollationCompatibility(
+   superCheck: => TypeCheckResult,

Review Comment:
   I could bail out for each function individually, but I think it's 
essentially the same thing? only this way, it's less copy-paste across the 
codebase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1517229022


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -410,7 +412,9 @@ public boolean endsWith(final UTF8String suffix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().endsWith(suffix.toLowerCase());
 }
-return matchAt(suffix, numBytes - suffix.numBytes, collationId);
+if (suffix.numBytes == 0 || this.numBytes == 0) return suffix.numBytes==0;
+
+return CollationFactory.getStringSearch(this, suffix, 
collationId).last()==this.numChars()-suffix.numChars();

Review Comment:
   for clarity, I think we can separate this out into `private boolean 
collatedEndsWith`



##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -410,7 +412,9 @@ public boolean endsWith(final UTF8String suffix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().endsWith(suffix.toLowerCase());
 }
-return matchAt(suffix, numBytes - suffix.numBytes, collationId);
+if (suffix.numBytes == 0 || this.numBytes == 0) return suffix.numBytes==0;
+
+return CollationFactory.getStringSearch(this, suffix, 
collationId).last()==this.numChars()-suffix.numChars();

Review Comment:
   for clarity, I think we can separate this out into `private boolean 
collatedEndsWith(...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47079][PYTHON][DOCS][FOLLOWUP] Add `VariantType` to API references [spark]

2024-03-07 Thread via GitHub


zhengruifeng opened a new pull request, #45429:
URL: https://github.com/apache/spark/pull/45429

   ### What changes were proposed in this pull request?
   Add `VariantType` to API references
   
   
   ### Why are the changes needed?
   `VariantType` has been added in `__all__` in `types`
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes
   
   ### How was this patch tested?
   ci
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1517226969


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +396,9 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);

Review Comment:
   if there's code in `UTF8String` that's no longer used, we should delete it 
(for example, the overloaded `matchAt`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support complex types of data [spark]

2024-03-07 Thread via GitHub


panbingkun commented on PR #44665:
URL: https://github.com/apache/spark/pull/44665#issuecomment-1985106638

   friendly ping @HyukjinKwon,
   When you are not busy, can you please continue to help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46510][CORE] Spark shell log filter should be applied to all AbstractAppender [spark]

2024-03-07 Thread via GitHub


AngersZh closed pull request #44496: [SPARK-46510][CORE] Spark shell log 
filter should be applied to all AbstractAppender
URL: https://github.com/apache/spark/pull/44496


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1517209325


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   python will always fail in runtime, so it's more likely whichever error 
message is better to understand...
   
   But, I don't know, IDE could be the rescue. @HyukjinKwon Will IDE be able to 
point out if we define this as abstract and implementation class does not 
implement this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on PR #45428:
URL: https://github.com/apache/spark/pull/45428#issuecomment-1985100729

   Thanks @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on PR #45428:
URL: https://github.com/apache/spark/pull/45428#issuecomment-1985099843

   also cc @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47305][SQL][TESTS][FOLLOWUP][3.4] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on PR #45428:
URL: https://github.com/apache/spark/pull/45428#issuecomment-1985094500

   This is my first time handling such a situation, is it better to create a 
new Jira or is it better as a FOLLOWUP of SPARK-47305? 
   
   cc @HyukjinKwon @HeartSaVioR @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47305][SQL][TESTS] Fix the compilation error related to `PropagateEmptyRelationSuite` [spark]

2024-03-07 Thread via GitHub


LuciferYang opened a new pull request, #45428:
URL: https://github.com/apache/spark/pull/45428

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1517119556


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Class used to provide access to expired timer's expiry time and timeout 
mode. These values

Review Comment:
   nit: Technically the timeout mode is not visible with trait. If that's 
intentional, probably remove that part in the interface doc.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode}
+
+/**
+ * Class that provides a concrete implementation that can be used to provide 
access to expired
+ * timer's expiry time and timeout mode. These values are only relevant if the 
ExpiredTimerInfo

Review Comment:
   nit: same, timeout mode is not visible to user function AFAIK.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -103,8 +116,12 @@ case class TransformWithStateExec(
 val keyObj = getKeyObj(keyRow)  // convert key to objects
 ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
 val valueObjIter = valueRowIter.map(getValueObj.apply)
-val mappedIterator = statefulProcessor.handleInputRows(keyObj, 
valueObjIter,
-  new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForLateEvents)).map { obj =>
+val mappedIterator = statefulProcessor.handleInputRows(
+  keyObj,
+  valueObjIter,
+  new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents),
+  new ExpiredTimerInfoImpl(false)

Review Comment:
   super nit / 2 cents: name parameter for boolean would give much better 
readability in non-IDE environment. It's really more about general suggestion 
and preference, so you can ignore.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
   override def getQueryInfo(): QueryInfo = currQueryInfo
 
+  private def getTimerState[T](): TimerStateImpl[T] = {
+new TimerStateImpl[T](store, timeoutMode, keyEncoder)
+  }
+
+  private val timerState = getTimerState[Boolean]()
+
+  override def registerTimer(expiryTimestampMs: Long): Unit = {
+if (!(timeoutMode == ProcessingTime || timeoutMode == EventTime)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidTimeoutMode(timeoutMode.toString)
+}
+
+if (!(currState == INITIALIZED || currState == DATA_PROCESSED)) {
+  throw 
StateStoreErrors.cannotUseTimersWithInvalidHandleState(currState.toString)
+}
+
+if (timerState.exists(expiryTimestampMs)) {
+  logWarning(s"Timer already exists for 
expiryTimestampMs=$expiryTimestampMs")

Review Comment:
   I'm OK with moving the logging to TimerStateImpl if it helps to solve this.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -121,6 +123,46 @@ class StatefulProcessorHandleImpl(
 
  

Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on PR #45424:
URL: https://github.com/apache/spark/pull/45424#issuecomment-1985053630

   Thanks, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


yaooqinn closed pull request #45424: [SPARK-47319][SQL] Improve missingInput 
calculation
URL: https://github.com/apache/spark/pull/45424


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on code in PR #45418:
URL: https://github.com/apache/spark/pull/45418#discussion_r1517168161


##
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##
@@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
*/
   def canHandle(url : String): Boolean
 
+  @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0")
+  def getCatalystType(
+  sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+
   /**
* Get the custom datatype mapping for the given jdbc meta information.
* @param sqlType The sql type (see java.sql.Types)
* @param typeName The sql type name (e.g. "BIGINT UNSIGNED")
* @param size The size of the type.
* @param md Result metadata associated with this type.
+   * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not.
* @return The actual DataType (subclasses of 
[[org.apache.spark.sql.types.DataType]])
* or null if the default type mapping should be used.
*/
   def getCatalystType(
-sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+  sqlType: Int,
+  typeName: String,
+  size: Int,
+  md: MetadataBuilder,
+  isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, 
typeName, size, md)

Review Comment:
   Comments addressed, please take a look again when you are available 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on code in PR #45418:
URL: https://github.com/apache/spark/pull/45418#discussion_r1517155629


##
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##
@@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
*/
   def canHandle(url : String): Boolean
 
+  @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0")
+  def getCatalystType(
+  sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+
   /**
* Get the custom datatype mapping for the given jdbc meta information.
* @param sqlType The sql type (see java.sql.Types)
* @param typeName The sql type name (e.g. "BIGINT UNSIGNED")
* @param size The size of the type.
* @param md Result metadata associated with this type.
+   * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not.
* @return The actual DataType (subclasses of 
[[org.apache.spark.sql.types.DataType]])
* or null if the default type mapping should be used.
*/
   def getCatalystType(
-sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+  sqlType: Int,
+  typeName: String,
+  size: Int,
+  md: MetadataBuilder,
+  isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, 
typeName, size, md)

Review Comment:
   Indeed, the precision is in the method parameters, and the scale is in the 
meta:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1984994353

   Is this good to go? @HeartSaVioR @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47265][SQL][TESTS] Replace `createTable(..., schema: StructType, ...)` with `createTable(..., columns: Array[Column], ...)` in UT [spark]

2024-03-07 Thread via GitHub


LuciferYang commented on code in PR #45368:
URL: https://github.com/apache/spark/pull/45368#discussion_r1517123366


##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala:
##
@@ -84,28 +85,28 @@ class BasicInMemoryTableCatalog extends TableCatalog {
 invalidatedTables.add(ident)
   }
 
-  // TODO: remove it when no tests calling this deprecated method.
+  // TODO: remove it when the deprecated method `createTable(..., StructType, 
...)`

Review Comment:
   I suggest adding some new comments to explain the reason for retaining this 
API and throwing exceptions by default while removing the `TODO`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47265][SQL][TESTS] Replace `createTable(..., schema: StructType, ...)` with `createTable(..., columns: Array[Column], ...)` in UT [spark]

2024-03-07 Thread via GitHub


cloud-fan commented on code in PR #45368:
URL: https://github.com/apache/spark/pull/45368#discussion_r1517121687


##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala:
##
@@ -84,28 +85,28 @@ class BasicInMemoryTableCatalog extends TableCatalog {
 invalidatedTables.add(ident)
   }
 
-  // TODO: remove it when no tests calling this deprecated method.
+  // TODO: remove it when the deprecated method `createTable(..., StructType, 
...)`

Review Comment:
   I think we can remove the TODO now. We should never remove the API and 
implementation will have to provide a fake implementation that just fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]

2024-03-07 Thread via GitHub


cloud-fan commented on code in PR #45418:
URL: https://github.com/apache/spark/pull/45418#discussion_r1517121075


##
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##
@@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
*/
   def canHandle(url : String): Boolean
 
+  @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0")
+  def getCatalystType(
+  sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+
   /**
* Get the custom datatype mapping for the given jdbc meta information.
* @param sqlType The sql type (see java.sql.Types)
* @param typeName The sql type name (e.g. "BIGINT UNSIGNED")
* @param size The size of the type.
* @param md Result metadata associated with this type.
+   * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not.
* @return The actual DataType (subclasses of 
[[org.apache.spark.sql.types.DataType]])
* or null if the default type mapping should be used.
*/
   def getCatalystType(
-sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+  sqlType: Int,
+  typeName: String,
+  size: Int,
+  md: MetadataBuilder,
+  isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, 
typeName, size, md)

Review Comment:
   I feel this is not a good API design. Are we going to keep deprecating and 
adding new overloads with extra parameters every time we want to pass more 
information? AFAIK we are already using `MetadataBuilder` to carry decimal 
precision information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


cloud-fan commented on code in PR #45424:
URL: https://github.com/apache/spark/pull/45424#discussion_r1517119767


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala:
##
@@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: 
mutable.LinkedHashSet[Attribute
* in `other`.
*/
   def --(other: Iterable[NamedExpression]): AttributeSet = {

Review Comment:
   This can be more efficient, but looks weird in standard collection APIs such 
as `def --`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-07 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1517085186


##
python/pyspark/resource/tests/test_connect_resources.py:
##
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests
+from pyspark.sql import SparkSession
+
+
+class ResourceProfileTests(unittest.TestCase):
+def test_profile_before_sc_for_connect(self):
+rpb = ResourceProfileBuilder()
+treqs = TaskResourceRequests().cpus(2)
+# no exception for building ResourceProfile
+rp = rpb.require(treqs).build

Review Comment:
   Yes, Added the checking in the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-07 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1517085041


##
python/pyspark/resource/tests/test_connect_resources.py:
##
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests
+from pyspark.sql import SparkSession
+
+
+class ResourceProfileTests(unittest.TestCase):
+def test_profile_before_sc_for_connect(self):

Review Comment:
   Added the error checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-07 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1517084819


##
python/pyspark/resource/profile.py:
##
@@ -114,14 +122,23 @@ def id(self) -> int:
 int
 A unique id of this :class:`ResourceProfile`
 """
+with self._lock:
+if self._id is None:
+if self._java_resource_profile is not None:
+self._id = self._java_resource_profile.id()
+else:
+from pyspark.sql import is_remote
 
-if self._java_resource_profile is not None:
-return self._java_resource_profile.id()
-else:
-raise RuntimeError(
-"SparkContext must be created to get the id, get the id "
-"after adding the ResourceProfile to an RDD"
-)
+if is_remote():
+from pyspark.sql.connect.resource.profile import 
ResourceProfile
+
+rp = ResourceProfile(

Review Comment:
   Done to add a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-07 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1517084721


##
python/pyspark/resource/profile.py:
##
@@ -114,14 +122,23 @@ def id(self) -> int:
 int
 A unique id of this :class:`ResourceProfile`
 """
+with self._lock:
+if self._id is None:
+if self._java_resource_profile is not None:
+self._id = self._java_resource_profile.id()
+else:
+from pyspark.sql import is_remote
 
-if self._java_resource_profile is not None:
-return self._java_resource_profile.id()
-else:
-raise RuntimeError(
-"SparkContext must be created to get the id, get the id "
-"after adding the ResourceProfile to an RDD"
-)
+if is_remote():
+from pyspark.sql.connect.resource.profile import 
ResourceProfile
+
+rp = ResourceProfile(

Review Comment:
   Yes, You're right, for the remote mode, the ResourceProfile is just like the 
proxy of the Spark ResourceProfile on the server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1984930529

   As the Spark Community didn't get any issue report during v3.3.0 - v3.5.1 
releases, I think this is a corner case. Maybe we can make the config internal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]

2024-03-07 Thread via GitHub


dongjoon-hyun commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1984929745

   +1 for the direction if we need to support both.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1984926315

   Thank you @dongjoon-hyun. 
   
   In such circumstances, I guess we can add a configuration for base64 classes 
to avoid breaking things again. AFAIK, Apache Hive also uses the JDK version, 
and I think the majority of Spark users talk to Hive heavily using Spark SQL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47314][DOC] Remove the wrong comment line of `ExternalSorter#writePartitionedMapOutput` method [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on PR #45415:
URL: https://github.com/apache/spark/pull/45415#issuecomment-1984919818

   Thanks @zwangsheng, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47314][DOC] Remove the wrong comment line of `ExternalSorter#writePartitionedMapOutput` method [spark]

2024-03-07 Thread via GitHub


yaooqinn closed pull request #45415: [SPARK-47314][DOC] Remove the  wrong 
comment line of `ExternalSorter#writePartitionedMapOutput` method
URL: https://github.com/apache/spark/pull/45415


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on PR #45427:
URL: https://github.com/apache/spark/pull/45427#issuecomment-1984918192

   Late +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]

2024-03-07 Thread via GitHub


HyukjinKwon closed pull request #45427: [MINOR][INFRA] Make "y/n" consistent 
within merge script
URL: https://github.com/apache/spark/pull/45427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45427:
URL: https://github.com/apache/spark/pull/45427#issuecomment-1984911418

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47314][DOC] Correct the `ExternalSorter#writePartitionedMapOutput` method comment [spark]

2024-03-07 Thread via GitHub


zwangsheng commented on code in PR #45415:
URL: https://github.com/apache/spark/pull/45415#discussion_r1517066704


##
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala:
##
@@ -690,7 +690,7 @@ private[spark] class ExternalSorter[K, V, C](
* Write all the data added into this ExternalSorter into a map output 
writer that pushes bytes
* to some arbitrary backing store. This is called by the SortShuffleWriter.
*
-   * @return array of lengths, in bytes, of each partition of the file (used 
by map output tracker)
+   * Update the partition's length, when call partition pair writer to close.

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [MINOR][INFRA] Make "y/n" consistent within merge script [spark]

2024-03-07 Thread via GitHub


HyukjinKwon opened a new pull request, #45427:
URL: https://github.com/apache/spark/pull/45427

   ### What changes were proposed in this pull request?
   
   This PR changes the y/n message and condition consistent within merging 
script.
   
   ### Why are the changes needed?
   
   For consistency.
   
   ```
   Would you like to use the modified body? (y/N): y
   ...
   Proceed with merging pull request #45426? (y/N): y
   ...
   Merge complete (local ref PR_TOOL_MERGE_PR_45426_MASTER). Push to apache? 
(y/N): y
   ...
   Would you like to pick 9cac2bb6 into another branch? (y/N): n
   ...
   Would you like to update an associated JIRA? (y/N): y
   ...
   Check if the JIRA information is as expected (Y/n): y  # <-- Inconsistent.
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, dev-only.
   
   ### How was this patch tested?
   
   Manually tested.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46992]Fix cache consistence [spark]

2024-03-07 Thread via GitHub


doki23 commented on PR #45181:
URL: https://github.com/apache/spark/pull/45181#issuecomment-1984850287

   > All children have to be considered for changes of their persistence state. 
Currently it only checks the fist found child. For clarity there is a test 
which fails: [doki23#1](https://github.com/doki23/spark/pull/1)
   
   So, we need a cache state signature for queryExecution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]

2024-03-07 Thread via GitHub


HyukjinKwon closed pull request #45426: [SPARK-47309][SQL][XML] Fix schema 
inference issues in XML
URL: https://github.com/apache/spark/pull/45426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45426:
URL: https://github.com/apache/spark/pull/45426#issuecomment-1984850009

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47078][DOCS][PYTHON] Documentation for SparkSession-based Profilers [spark]

2024-03-07 Thread via GitHub


HyukjinKwon closed pull request #45269: [SPARK-47078][DOCS][PYTHON] 
Documentation for SparkSession-based Profilers
URL: https://github.com/apache/spark/pull/45269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47078][DOCS][PYTHON] Documentation for SparkSession-based Profilers [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45269:
URL: https://github.com/apache/spark/pull/45269#issuecomment-1984848824

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on code in PR #45422:
URL: https://github.com/apache/spark/pull/45422#discussion_r1517022572


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.{DataType, StringType}
+
+object CollationUtils {
+  def checkCollationCompatibility(
+   superCheck: => TypeCheckResult,
+   collationId: Int,
+   rightDataType: DataType
+ ): TypeCheckResult = {
+val checkResult = superCheck
+if (checkResult.isFailure) return checkResult
+// Additional check needed for collation compatibility
+val rightCollationId: Int = 
rightDataType.asInstanceOf[StringType].collationId
+if (collationId != rightCollationId) {
+  return DataTypeMismatch(
+errorSubClass = "COLLATION_MISMATCH",
+messageParameters = Map(
+  "collationNameLeft" -> 
CollationFactory.fetchCollation(collationId).collationName,
+  "collationNameRight" -> 
CollationFactory.fetchCollation(rightCollationId).collationName
+)
+  )
+}
+TypeCheckResult.TypeCheckSuccess
+  }
+
+  final val SUPPORT_BINARY_ONLY: Int = 0
+  final val SUPPORT_LOWERCASE: Int = 1
+  final val SUPPORT_ALL_COLLATIONS: Int = 2
+
+  def checkCollationSupport(

Review Comment:
   Let's keep the indentation propery with 2/4 spaces 
(https://github.com/databricks/scala-style-guide)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Miland db/miland legacy error class [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45423:
URL: https://github.com/apache/spark/pull/45423#issuecomment-1984835207

   See also https://spark.apache.org/contributing.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Miland db/miland legacy error class [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45423:
URL: https://github.com/apache/spark/pull/45423#issuecomment-1984834978

   Mind filing a JIRA and linking it to the PR title please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Add the LISTAGG() aggregate function [spark]

2024-03-07 Thread via GitHub


github-actions[bot] closed pull request #42398: [SPARK-42746][SQL] Add the 
LISTAGG() aggregate function
URL: https://github.com/apache/spark/pull/42398


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46034][CORE] SparkContext add file should also copy file to local root path [spark]

2024-03-07 Thread via GitHub


github-actions[bot] commented on PR #43936:
URL: https://github.com/apache/spark/pull/43936#issuecomment-1984826472

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46071][SQL] Optimize CaseWhen toJSON content [spark]

2024-03-07 Thread via GitHub


github-actions[bot] commented on PR #43979:
URL: https://github.com/apache/spark/pull/43979#issuecomment-1984826451

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47309][SQL][XML] Fix schema inference issues in XML [spark]

2024-03-07 Thread via GitHub


shujingyang-db opened a new pull request, #45426:
URL: https://github.com/apache/spark/pull/45426

   
   
   ### What changes were proposed in this pull request?
   
   This PR fixes XML schema inference issues:
   
   1. when there's an empty tag
   
   2. when merging schema for NullType
   
   ### Why are the changes needed?
   
   Fix a bug
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes
   
   ### How was this patch tested?
   
   Unit tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]

2024-03-07 Thread via GitHub


xinrong-meng commented on PR #45378:
URL: https://github.com/apache/spark/pull/45378#issuecomment-1984523232

   Merged to master, thank you all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]

2024-03-07 Thread via GitHub


xinrong-meng closed pull request #45378: [SPARK-47276][PYTHON][CONNECT] 
Introduce `spark.profile.clear` for SparkSession-based profiling
URL: https://github.com/apache/spark/pull/45378


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307] Replace RFC 2045 base64 encoder with RFC 4648 encoder [spark]

2024-03-07 Thread via GitHub


dongjoon-hyun commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1984433848

   Thank you for the confirmation, @ted-jenks . Well, in this case, it's too 
late to change the behavior again. Apache Spark 3.3 is already the EOL status 
since last year and I don't think we need to change the behavior for Apache 
Spark 3.4.3 and 3.5.2 because Apache Spark community didn't have such an 
official contract before. It would be great if you participate the community at 
Apache Spark 3.3.0 RC votes at that time.
   
   > > It sounds like you have other systems to read Spark's data.
   >
   > Correct. The issue was that from 3.2 to 3.3 there was a behavior change in 
the base64 encodings used in spark. Previously, they did not chunk. Now, they 
do. Chunked base64 cannot be read by non-MIME compatible base64 decoders 
causing the data output by Spark to be corrupt to systems following the normal 
base64 standard.
   > 
   > I think the best path forward is to use MIME encoding/decoding without 
chunking as this is the most fault tolerant meaning existing use-cases will not 
break, but the pre 3.3 base64 behavior is upheld.
   
   However, I understand and agree with @ted-jenks 's point as a nice-to-have 
of Apache Spark 4+ officially. In other words, if we want to merge this PR, we 
need to make it official from Apache Spark 4.0.0 and protect that as a kind of 
developer interface for all future releases. Do you think it's okay, @ted-jenks 
?
   
   BTW, how do you think about this proposal, @yaooqinn (the original author of 
#35110) and @cloud-fan and @HyukjinKwon ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47311][SQL][PYTHON] Suppress Python exceptions where PySpark is not in the Python path [spark]

2024-03-07 Thread via GitHub


xinrong-meng commented on PR #45414:
URL: https://github.com/apache/spark/pull/45414#issuecomment-1984375769

   Looks nice, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-07 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516770647


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   Why is it O(n^2)? The ReplaceWithExpression would replace all WITH 
expressions in one pass when we first call it, would not it? (I know that 
technically it is also called as part of optimization loop in 
OptimizeSubqueries, but by that point all the WITH expressions are replaced).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]

2024-03-07 Thread via GitHub


xinrong-meng commented on code in PR #45378:
URL: https://github.com/apache/spark/pull/45378#discussion_r1516752307


##
python/pyspark/sql/tests/test_session.py:
##
@@ -531,6 +531,33 @@ def test_dump_invalid_type(self):
 },
 )
 
+def test_clear_memory_type(self):

Review Comment:
   Good idea!
   
   For now, all logic tested by SparkSessionProfileTests is directly imported 
in Spark Connect with no modification. But I do agree separating it later will 
improve readability and ensure future parity. I'll refactor later. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]

2024-03-07 Thread via GitHub


ueshin commented on code in PR #45378:
URL: https://github.com/apache/spark/pull/45378#discussion_r1516750441


##
python/pyspark/sql/profiler.py:
##
@@ -224,6 +224,54 @@ def dump(id: int) -> None:
 for id in sorted(code_map.keys()):
 dump(id)
 
+def clear_perf_profiles(self, id: Optional[int] = None) -> None:
+"""
+Clear the perf profile results.
+
+.. versionadded:: 4.0.0

Review Comment:
   Actually this is not. The `clear` in `Profile` should be a user-facing API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling [spark]

2024-03-07 Thread via GitHub


xinrong-meng commented on code in PR #45378:
URL: https://github.com/apache/spark/pull/45378#discussion_r1516752307


##
python/pyspark/sql/tests/test_session.py:
##
@@ -531,6 +531,33 @@ def test_dump_invalid_type(self):
 },
 )
 
+def test_clear_memory_type(self):

Review Comment:
   Good idea!
   
   For now, all logic tested by SparkSessionProfileTests is directly imported 
in Spark Connect with no modification. But I do agree separating it later will 
improve readability. I'll refactor later. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47318][Security] Adds HKDF round to AuthEngine key derivation [spark]

2024-03-07 Thread via GitHub


sweisdb opened a new pull request, #45425:
URL: https://github.com/apache/spark/pull/45425

   ### What changes were proposed in this pull request?
   
   This change adds an additional pass through a key derivation function (KDF) 
to the key exchange protocol in `AuthEngine`. Currently, it uses the shared 
secret from a bespoke key negotiation protocol directly. This is an encoded X 
coordinate on the X25519 curve. It is atypical and not recommended to use that 
coordinate directly as a key, but rather to pass it to an KDF.
   
   Note, Spark now supports TLS for RPC calls. It is preferable to use that 
rather than the bespoke AES RPC encryption implemented by `AuthEngine` and 
`TransportCipher`.
   
   ### Why are the changes needed?
   
   This follows best practices of key negotiation protocols. The encoded X 
coordinate is not guaranteed to be uniformly distributed over the 32-byte key 
space. Rather, we pass it through a HKDF function to map it uniformly to a 
16-byte key space.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Exiting tests under:
   `build/sbt "network-common/test:testOnly"`
   
   Specifically:
   `build/sbt "network-common/test:testOnly 
org.apache.spark.network.crypto.AuthEngineSuite"`
   `build/sbt "network-common/test:testOnly 
org.apache.spark.network.crypto.AuthIntegrationSuite"`
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


attilapiros commented on code in PR #45424:
URL: https://github.com/apache/spark/pull/45424#discussion_r1516669562


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala:
##
@@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: 
mutable.LinkedHashSet[Attribute
* in `other`.
*/
   def --(other: Iterable[NamedExpression]): AttributeSet = {

Review Comment:
   and then we can save here what in the `missingInput()` was saved in your 
previous commit (the calculation of the `inputSet`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


attilapiros commented on code in PR #45424:
URL: https://github.com/apache/spark/pull/45424#discussion_r1516651884


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala:
##
@@ -104,13 +104,19 @@ class AttributeSet private (private val baseSet: 
mutable.LinkedHashSet[Attribute
* in `other`.
*/
   def --(other: Iterable[NamedExpression]): AttributeSet = {

Review Comment:
   @peter-toth  What about changing the `other` to a call-by-name parameter?
   ```suggestion
 def --(other: => Iterable[NamedExpression]): AttributeSet = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


peter-toth commented on PR #45424:
URL: https://github.com/apache/spark/pull/45424#issuecomment-1984153122

   @cloud-fan can you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47319][SQL] Improve missingInput calculation [spark]

2024-03-07 Thread via GitHub


attilapiros commented on PR #45424:
URL: https://github.com/apache/spark/pull/45424#issuecomment-1984150861

   LGTM
   
   I talked to @peter-toth offline and the improvement comes from not 
calculating the `inputSet` at all when references is empty 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1516609093


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)

Review Comment:
   Do we need to set this config? 
`workerFactory.createSimpleWorker(blockingMode = true)` this is already not 
using the daemon mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-37932][SQL]Wait to resolve missing attributes before applying DeduplicateRelations [spark]

2024-03-07 Thread via GitHub


peter-toth commented on PR #35684:
URL: https://github.com/apache/spark/pull/35684#issuecomment-1984107426

   @martinf-moodys, 
[SPARK-47319](https://issues.apache.org/jira/browse/SPARK-47319) / 
https://github.com/apache/spark/pull/45424 might help, especially if you have 
many `Union` nodes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47319][SQL] Fix missingInput calculation [spark]

2024-03-07 Thread via GitHub


peter-toth opened a new pull request, #45424:
URL: https://github.com/apache/spark/pull/45424

   ### What changes were proposed in this pull request?
   This PR speeds up `QueryPlan.missingInput()` calculation.
   
   
   ### Why are the changes needed?
   This seems to be the root cause of `DeduplicateRelations` slowness in some 
cases.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Existing UTs.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


stefankandic commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516535896


##
sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala:
##
@@ -117,7 +117,7 @@ object DataType {
   private val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r
   private val CHAR_TYPE = """char\(\s*(\d+)\s*\)""".r
   private val VARCHAR_TYPE = """varchar\(\s*(\d+)\s*\)""".r
-  private val COLLATED_STRING_TYPE = """string\s+COLLATE\s+'([\w_]+)'""".r
+  private val COLLATED_STRING_TYPE = """string\s+COLLATE\s+([\w_]+)""".r

Review Comment:
   Nice catch! Since `collate_key_word_as_identifier` is false I guess we only 
need to support backticks. I added the tests for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-07 Thread via GitHub


sahnib commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1516471532


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:

Review Comment:
   Should we make this abstract to force user to implement it? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  // When the python process for python_streaming_source_runner receives one 
of the
+  // integers below, it will invoke the corresponding function of StreamReader 
instance.
+  val INITIAL_OFFSET_FUNC_ID = 884
+  val LATEST_OFFSET_FUNC_ID = 885
+  val PARTITIONS_FUNC_ID = 886
+  val COMMIT_FUNC_ID = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  private val bufferSize: Int = conf.get(BUFFER_SIZE)
+  private val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  private val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  

Re: [PR] Miland db/miland legacy error class [spark]

2024-03-07 Thread via GitHub


miland-db closed pull request #45423: Miland db/miland legacy error class
URL: https://github.com/apache/spark/pull/45423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Miland db/miland legacy error class [spark]

2024-03-07 Thread via GitHub


miland-db opened a new pull request, #45423:
URL: https://github.com/apache/spark/pull/45423

   ### What changes were proposed in this pull request?
   In the PR, I propose to assign the proper names to the legacy error classes 
_LEGACY_ERROR_TEMP_324[7-9], and modify tests in testing suites to reflect 
these changes and use checkError() function. Also this PR improves the error 
messages.
   
   ### Why are the changes needed?
   Proper name improves user experience w/ Spark SQL.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, the PR changes an user-facing error message.
   
   ### How was this patch tested?
   Tests already exist.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]

2024-03-07 Thread via GitHub


dbatomic commented on code in PR #45422:
URL: https://github.com/apache/spark/pull/45422#discussion_r1516510742


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.{DataType, StringType}
+
+object CollationUtils {

Review Comment:
   I usually recommend to stay away from *Utils naming, since the name is not 
descriptive.
   
   Can this file be called `CollationTypeConstraints`?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.{DataType, StringType}
+
+object CollationUtils {
+  def checkCollationCompatibility(
+   superCheck: => TypeCheckResult,

Review Comment:
   Why do you need superCheck here? Can you just bail out before calling this 
check if super check is failure?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationUtils.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.types.{DataType, StringType}
+
+object CollationUtils {
+  def checkCollationCompatibility(
+   superCheck: => TypeCheckResult,
+   collationId: Int,
+   rightDataType: DataType
+ ): TypeCheckResult = {
+val checkResult = superCheck
+if (checkResult.isFailure) return checkResult
+// Additional check needed for collation compatibility
+val rightCollationId: Int = 

Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-07 Thread via GitHub


jchen5 commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516503722


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   > Thinking about it more, why do we handle the count bug in two places that 
are far away?
   
   I wrote a doc about the reason for it and whether we can change it: 
https://docs.google.com/document/d/1YCce0DtJ6NkMP1QM1nnfYvkiH1CkHoGyMjKQ8MX5bUM/edit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45827][SQL] Move data type checks to CreatableRelationProvider [spark]

2024-03-07 Thread via GitHub


cloud-fan closed pull request #45409: [SPARK-45827][SQL] Move data type checks 
to CreatableRelationProvider
URL: https://github.com/apache/spark/pull/45409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45827][SQL] Move data type checks to CreatableRelationProvider [spark]

2024-03-07 Thread via GitHub


cloud-fan commented on PR #45409:
URL: https://github.com/apache/spark/pull/45409#issuecomment-1983973892

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47296][SQL][COLLATION] Fail unsupported functions for non-binary collations [spark]

2024-03-07 Thread via GitHub


uros-db opened a new pull request, #45422:
URL: https://github.com/apache/spark/pull/45422

   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   Currently, all `StringType` arguments passed to built-in string functions in 
Spark SQL get treated as binary strings. This behaviour is incorrect for almost 
all collationIds except the default (0), and we should instead warn the user if 
they try to use an unsupported collation for the given function. Over time, we 
should implement the appropriate support for these (function, collation) pairs, 
but until then - we should have a way to fail unsupported statements in query 
analysis.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, users will now get appropriate errors when they try to use an 
unsupported collation with a given string function.
   
   
   ### How was this patch tested?
   Tests in CollationSuite to check if these functions work for binary 
collations and throw exceptions for others.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


MaxGekk commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516378011


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1096,7 +1096,7 @@ colPosition
 ;
 
 collateClause
-: COLLATE collationName=stringLit
+: COLLATE collationName=identifier

Review Comment:
   BTW, I would improve the error message to:
   ```
   [COLLATION_INVALID_NAME] `test_collation` does not represent a correct 
collation name. Suggested valid collation name: UCS_BASIC. SQLSTATE: 42704
   ```
   and attach a query context like in:
   ```
   spark-sql (default)> select 'aaa' from test-table;
   
   [INVALID_IDENTIFIER] The identifier test-table is invalid. Please, consider 
quoting it with back-quotes as `test-table`. SQLSTATE: 42602 (line 1, pos 22)
   
   == SQL ==
   select 'aaa' from test-table
   --^^^
   ```
   @dbatomic Could you open an JIRA for that, please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


MaxGekk commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516415830


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1096,7 +1096,7 @@ colPosition
 ;
 
 collateClause
-: COLLATE collationName=stringLit
+: COLLATE collationName=identifier

Review Comment:
   I agree, it would be better to report about incompatible types rather 
invalid identifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


stefankandic commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516396458


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1096,7 +1096,7 @@ colPosition
 ;
 
 collateClause
-: COLLATE collationName=stringLit
+: COLLATE collationName=identifier

Review Comment:
   what about this case:
   ```code
   select 'a' collate UNICODE-columnB
   ```
   
   shouldn't we report that the types are incompatible for minus operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47316][SQL] Fix TimestampNTZ in Postgres Array [spark]

2024-03-07 Thread via GitHub


yaooqinn commented on code in PR #45418:
URL: https://github.com/apache/spark/pull/45418#discussion_r1516398411


##
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala:
##
@@ -87,17 +87,26 @@ abstract class JdbcDialect extends Serializable with 
Logging {
*/
   def canHandle(url : String): Boolean
 
+  @deprecated("Implement getCatalystType with isTimestampNTZ instead", "4.0.0")
+  def getCatalystType(
+  sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+
   /**
* Get the custom datatype mapping for the given jdbc meta information.
* @param sqlType The sql type (see java.sql.Types)
* @param typeName The sql type name (e.g. "BIGINT UNSIGNED")
* @param size The size of the type.
* @param md Result metadata associated with this type.
+   * @param isTimestampNTZ Use TIMESTAMP_NTZ type or not.
* @return The actual DataType (subclasses of 
[[org.apache.spark.sql.types.DataType]])
* or null if the default type mapping should be used.
*/
   def getCatalystType(
-sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = None
+  sqlType: Int,
+  typeName: String,
+  size: Int,
+  md: MetadataBuilder,
+  isTimestampNTZ: Boolean): Option[DataType] = getCatalystType(sqlType, 
typeName, size, md)

Review Comment:
   It is not a breaking change. It has a default implementation which fallbacks 
to the original method.
   
   Carrying in meta works but is not obvious for third-party developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


stefankandic commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516396458


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1096,7 +1096,7 @@ colPosition
 ;
 
 collateClause
-: COLLATE collationName=stringLit
+: COLLATE collationName=identifier

Review Comment:
   what about this case:
   ```code
   select 'a' collate UNICODE-columnB
   ```
   
   then the error would be:
   ```code
   [INVALID_IDENTIFIER] The identifier UNICODE-columnB is invalid. Please, 
consider quoting it with back-quotes as `UNICODE-columnB`. SQLSTATE: 42602
   ```
   
   instead of giving the error that types are incompatible for minus operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1516389365


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) {
   }
 
   public boolean startsWith(final UTF8String prefix, int collationId) {
-if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+if (collationId == CollationFactory.DEFAULT_COLLATION_ID) {
   return this.startsWith(prefix);
 }
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+if (prefix.numBytes == 0 || this.numBytes == 0) return prefix.numBytes==0;
+
+String prefixString = 
StandardCharsets.UTF_8.decode(prefix.getByteBuffer()).toString(),
+patternString = 
StandardCharsets.UTF_8.decode(this.getByteBuffer()).toString();
+
+StringSearch search = new StringSearch(prefixString,

Review Comment:
   referring to this PR: https://github.com/apache/spark/pull/45382
   you can see that we shouldn't instantiate StringSearch objects outside of 
CollationFactory as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1516391257


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -31,6 +32,8 @@
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
+import com.ibm.icu.text.RuleBasedCollator;
+import com.ibm.icu.text.StringSearch;

Review Comment:
   (see comment below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1516381847


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) {
   }
 
   public boolean startsWith(final UTF8String prefix, int collationId) {
-if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+if (collationId == CollationFactory.DEFAULT_COLLATION_ID) {
   return this.startsWith(prefix);
 }
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+if (prefix.numBytes == 0 || this.numBytes == 0) return prefix.numBytes==0;
+
+String prefixString = 
StandardCharsets.UTF_8.decode(prefix.getByteBuffer()).toString(),
+patternString = 
StandardCharsets.UTF_8.decode(this.getByteBuffer()).toString();
+
+StringSearch search = new StringSearch(prefixString,
+new StringCharacterIterator(patternString),
+(RuleBasedCollator) 
CollationFactory.fetchCollation(collationId).collator
+);
+
+return search.first()==0;
   }
 
   public boolean endsWith(final UTF8String suffix) {
 return matchAt(suffix, numBytes - suffix.numBytes);
   }
 
   public boolean endsWith(final UTF8String suffix, int collationId) {
-if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+if (collationId == CollationFactory.DEFAULT_COLLATION_ID) {

Review Comment:
   (see comment above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1516380909


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -384,27 +387,47 @@ public boolean startsWith(final UTF8String prefix) {
   }
 
   public boolean startsWith(final UTF8String prefix, int collationId) {
-if (CollationFactory.fetchCollation(collationId).isBinaryCollation) {
+if (collationId == CollationFactory.DEFAULT_COLLATION_ID) {

Review Comment:
   we shouldn't make changes to the original logic, this part was fine
   in this PR, we should only focus on the "third" type of collations 
(non-binary, non-lowercase)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-07 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1516379232


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -31,6 +32,8 @@
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
+import com.ibm.icu.text.RuleBasedCollator;

Review Comment:
   we should not expose collator outside of `CollationFactory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47302][SQL][Collation] Collate keyword as identifier [spark]

2024-03-07 Thread via GitHub


MaxGekk commented on code in PR #45405:
URL: https://github.com/apache/spark/pull/45405#discussion_r1516378011


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1096,7 +1096,7 @@ colPosition
 ;
 
 collateClause
-: COLLATE collationName=stringLit
+: COLLATE collationName=identifier

Review Comment:
   BTW, I would improve the error message to:
   ```
   [COLLATION_INVALID_NAME] `test_collation` does not represent a correct 
collation name. Suggested valid collation name: UCS_BASIC. SQLSTATE: 42704
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >