Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-11 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1519232536


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -892,6 +893,9 @@ message MapPartitions {
 
   // (Optional) Whether to use barrier mode execution or not.
   optional bool is_barrier = 3;
+
+  // (Optional) ResourceProfile id used for the stage level scheduling.
+  optional int32 profile_id = 4;

Review Comment:
   Hi @grundprinzip, The user still needs to know the exact ResourceProfile id, 
if we attach resource profile in the call, seems we can't get id in this call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-11 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1519232917


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -1011,5 +1039,7 @@ service SparkConnectService {
 
   // FetchErrorDetails retrieves the matched exception with details based on a 
provided error id.
   rpc FetchErrorDetails(FetchErrorDetailsRequest) returns 
(FetchErrorDetailsResponse) {}
-}
 
+  // Build ResourceProfile and get the profile id
+  rpc BuildResourceProfile(BuildResourceProfileRequest) returns 
(BuildResourceProfileResponse) {}

Review Comment:
   Hi @grundprinzip, Really good suggestion, just made the newest commit to 
move it to the command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-11 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1519235937


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectBuildResourceProfileHandler.scala:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.jdk.CollectionConverters.MapHasAsScala
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, 
TaskResourceProfile, TaskResourceRequest}
+
+class SparkConnectBuildResourceProfileHandler(
+responseObserver: StreamObserver[proto.BuildResourceProfileResponse])
+extends Logging {
+
+  /**
+   * transform the spark connect ResourceProfile to spark ResourceProfile
+   * @param rp
+   *   Spark connect ResourceProfile
+   * @return
+   *   the Spark ResourceProfile
+   */
+  private def transformResourceProfile(rp: proto.ResourceProfile): 
ResourceProfile = {
+val ereqs = rp.getExecutorResourcesMap.asScala.map { case (name, res) =>
+  name -> new ExecutorResourceRequest(
+res.getResourceName,
+res.getAmount,
+res.getDiscoveryScript,
+res.getVendor)
+}.toMap
+val treqs = rp.getTaskResourcesMap.asScala.map { case (name, res) =>
+  name -> new TaskResourceRequest(res.getResourceName, res.getAmount)
+}.toMap
+
+if (ereqs.isEmpty) {
+  new TaskResourceProfile(treqs)
+} else {
+  new ResourceProfile(ereqs, treqs)
+}
+  }
+
+  def handle(request: proto.BuildResourceProfileRequest): Unit = {
+val holder = SparkConnectService
+  .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+
+val rp = transformResourceProfile(request.getProfile)
+
+val session = holder.session
+session.sparkContext.resourceProfileManager.addResourceProfile(rp)

Review Comment:
   Yeah, Both ResourceProfile and ResourceProfileManager don't have the 
cleanup. If you think we need to cleanup, we can file another PR for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-11 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1519241310


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -892,6 +893,9 @@ message MapPartitions {
 
   // (Optional) Whether to use barrier mode execution or not.
   optional bool is_barrier = 3;
+
+  // (Optional) ResourceProfile id used for the stage level scheduling.
+  optional int32 profile_id = 4;

Review Comment:
   Hi @grundprinzip,
   
   I'm not quite following you about "by uuids".
   
   So the basic implementation is
   
   1. The client creates ResourceProfile
   2. if the profile ID of ResourceProfile is accessed for the first time, then 
the client will ask to create ResourceProfile and add it to the 
ResourceProfileManager on the server side, and the server side will return the 
profile ID to the client which will set the id to the ResourceProfile on the 
client side.
   3. The internal mapInPandas/mapInArrow will just use the ResourceProfile id, 
and the server side can extract the ResourceProfile from ResourceProfileManager 
according to the id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-03-11 Thread via GitHub


wbo4958 commented on code in PR #45232:
URL: https://github.com/apache/spark/pull/45232#discussion_r1519241310


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -892,6 +893,9 @@ message MapPartitions {
 
   // (Optional) Whether to use barrier mode execution or not.
   optional bool is_barrier = 3;
+
+  // (Optional) ResourceProfile id used for the stage level scheduling.
+  optional int32 profile_id = 4;

Review Comment:
   I'm not quite following you about "by uuids".
   
   So the basic implementation is
   
   1. The client creates ResourceProfile
   2. if the profile ID of ResourceProfile is accessed for the first time, then 
the client will ask to create ResourceProfile and add it to the 
ResourceProfileManager on the server side, and the server side will return the 
profile ID to the client which will set the id to the ResourceProfile on the 
client side.
   3. The internal mapInPandas/mapInArrow will just use the ResourceProfile id, 
and the server side can extract the ResourceProfile from ResourceProfileManager 
according to the id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support some types of data [spark]

2024-03-11 Thread via GitHub


panbingkun commented on code in PR #44665:
URL: https://github.com/apache/spark/pull/44665#discussion_r1519249360


##
python/pyspark/sql/functions/builtin.py:
##
@@ -15534,19 +15532,7 @@ def to_csv(col: "ColumnOrName", options: 
Optional[Dict[str, str]] = None) -> Col
 |  2,Alice|
 +-+
 
-Example 2: Converting a complex StructType to a CSV string
-
->>> from pyspark.sql import Row, functions as sf
->>> data = [(1, Row(age=2, name='Alice', scores=[100, 200, 300]))]
->>> df = spark.createDataFrame(data, ("key", "value"))
->>> df.select(sf.to_csv(df.value)).show(truncate=False) # doctest: +SKIP
-+---+
-|to_csv(value)  |
-+---+
-|2,Alice,"[100,200,300]"|

Review Comment:
   I have updated the document `sql-migration-guide.md`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519251919


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.types._
+import org.apache.spark.util.NextIterator
+
+/**
+ * Singleton utils class used primarily while interacting with TimerState
+ */
+object TimerStateUtils {
+  case class TimestampWithKey(
+  key: Any,
+  expiryTimestampMs: Long) extends Serializable
+
+  val PROC_TIMERS_STATE_NAME = "_procTimers"
+  val EVENT_TIMERS_STATE_NAME = "_eventTimers"
+  val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
+  val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
+}
+
+/**
+ * Class that provides the implementation for storing timers
+ * used within the `transformWithState` operator.
+ * @param store - state store to be used for storing timer data
+ * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param keyExprEnc - encoder for key expression
+ * @tparam S - type of timer value
+ */
+class TimerStateImpl[S](
+store: StateStore,
+timeoutMode: TimeoutMode,
+keyExprEnc: ExpressionEncoder[Any]) extends Logging {
+
+  private val EMPTY_ROW =
+
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
+
+  private val schemaForPrefixKey: StructType = new StructType()
+.add("key", BinaryType)
+
+  private val schemaForKeyRow: StructType = new StructType()
+.add("key", BinaryType)
+.add("expiryTimestampMs", LongType, nullable = false)
+
+  private val keySchemaForSecIndex: StructType = new StructType()
+.add("expiryTimestampMs", BinaryType, nullable = false)
+.add("key", BinaryType)
+
+  private val schemaForValueRow: StructType =
+StructType(Array(StructField("__dummy__", NullType)))
+
+  private val keySerializer = keyExprEnc.createSerializer()
+
+  private val prefixKeyEncoder = UnsafeProjection.create(schemaForPrefixKey)
+
+  private val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+
+  private val secIndexKeyEncoder = 
UnsafeProjection.create(keySchemaForSecIndex)
+
+  val timerCfName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+TimerStateUtils.PROC_TIMERS_STATE_NAME
+  } else {
+TimerStateUtils.EVENT_TIMERS_STATE_NAME
+  }
+
+  val keyToTsCFName = timerCfName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
+  store.createColFamilyIfAbsent(keyToTsCFName,
+schemaForKeyRow, numColsPrefixKey = 1,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  val tsToKeyCFName = timerCfName + TimerStateUtils.TIMESTAMP_TO_KEY_CF
+  store.createColFamilyIfAbsent(tsToKeyCFName,
+keySchemaForSecIndex, numColsPrefixKey = 0,
+schemaForValueRow, useMultipleValuesPerKey = false,
+isInternal = true)
+
+  private def encodeKey(expiryTimestampMs: Long): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (!keyOption.isDefined) {
+  throw StateStoreErrors.implicitKeyNotFound(keyToTsCFName)
+}
+
+val keyByteArr = 
keySerializer.apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+val keyRow = keyEncoder(InternalRow(keyByteArr, expiryTimestampMs))
+keyRow
+  }
+
+  //  We maintain a secondary index that inverts the ordering of the timestamp
+  //  and grouping key and maintains the list of (expiry) timestamps in sorted 
order
+  //  (using BIG_ENDIAN encoding) within RocksDB.
+  //  This is because RocksDB uses byte-wise comparison using the default 
comparator to
+  //  determine sorted order of keys. This is used to read expired timers at 
any given
+  //  processing time/event time timestamp threshold by performing a range 
scan.

Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support some types of data [spark]

2024-03-11 Thread via GitHub


panbingkun commented on code in PR #44665:
URL: https://github.com/apache/spark/pull/44665#discussion_r1519252452


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala:
##
@@ -260,16 +263,36 @@ case class StructsToCsv(
   child = child,
   timeZoneId = None)
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+child.dataType match {
+  case schema: StructType if schema.map(_.dataType).forall(
+dt => supportDataType(dt)) => TypeCheckSuccess
+  case _: StructType => DataTypeMismatch(errorSubClass = 
"TO_CSV_COMPLEX_TYPE")
+  case _ => DataTypeMismatch(
+errorSubClass = "UNEXPECTED_INPUT_TYPE",
+messageParameters = Map(
+  "paramIndex" -> ordinalNumber(0),
+  "requiredType" -> toSQLType(StructType),
+  "inputSql" -> toSQLExpr(child),
+  "inputType" -> toSQLType(child.dataType)
+)
+  )
+}
+  }
+
+  @tailrec
+  private def supportDataType(dataType: DataType): Boolean = dataType match {
+case _: VariantType | BinaryType => false

Review Comment:
   Because `these types` cannot be `read back` through `from_csv`,
   I have added related tests in UT `CsvFunctionsSuite`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support some types of data [spark]

2024-03-11 Thread via GitHub


panbingkun commented on code in PR #44665:
URL: https://github.com/apache/spark/pull/44665#discussion_r1519260055


##
sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala:
##
@@ -294,10 +294,19 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("to_csv with option (nullValue)") {

Review Comment:
   Because `NullType` cannot be read back through `from_csv`.
   In the new logic, it will throw `DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` 
exception.
   And as far as I understand, this UT is mainly to test whether `option 
(nullValue)` is effective,
   The test `target` of changed UT are consistent with this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-47274][PYTHON][CONNECT] Provide more useful context for PySpark DataFrame API errors [spark]

2024-03-11 Thread via GitHub


itholic commented on code in PR #45377:
URL: https://github.com/apache/spark/pull/45377#discussion_r1519270151


##
python/pyspark/errors/utils.py:
##
@@ -15,12 +15,22 @@
 # limitations under the License.
 #
 
+import builtins
 import re
-from typing import Dict, Match
+import functools
+import inspect
+import threading
+from typing import Any, Callable, Dict, Match, TypeVar, Type
+
+from IPython import get_ipython

Review Comment:
   Thanks for pointing out!
   
   btw I'm preparing new design for error context for leveraging JVM 
stacktrace. Let me address this comment along with applying the new design.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support some types of data [spark]

2024-03-11 Thread via GitHub


panbingkun commented on code in PR #44665:
URL: https://github.com/apache/spark/pull/44665#discussion_r1519273344


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala:
##
@@ -260,16 +263,33 @@ case class StructsToCsv(
   child = child,
   timeZoneId = None)
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+child.dataType match {
+  case schema: StructType if schema.map(_.dataType).forall(
+dt => isSupportedDataType(dt)) => TypeCheckSuccess
+  case _ => DataTypeMismatch(
+errorSubClass = "UNSUPPORTED_INPUT_TYPE",

Review Comment:
   Using `UNSUPPORTED_INPUT_TYPE` instead of `UNEXPECTED_INPUT_TYPE` here will 
make the prompt information more `clear` to the user, for example:
   
   ```
   val rows = new java.util.ArrayList[Row]()
   rows.add(Row(1L, Row(2L, "Alice",
 Map("math" -> 100L, "english" -> 200L, "science" -> null
   
   val valueSchema = StructType(Seq(
 StructField("age", LongType),
 StructField("name", StringType),
 StructField("scores", MapType(StringType, LongType
   val schema = StructType(Seq(
 StructField("key", LongType),
 StructField("value", valueSchema)))
   
   val df = spark.createDataFrame(rows, schema)
   
   df.select(to_csv($"value")).collect()
   ```
   
   Before(Last review):
   Users may feel confused about prompt: `requires the "STRUCT" type, however 
"value" has the type "STRUCT...`
   ```
   [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "to_csv(value)" due 
to data type mismatch: The 1 parameter requires the "STRUCT" type, however 
"value" has the type "STRUCT>". SQLSTATE: 42K09;
   ```
   
   After(Right now):
   ```
   [DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE] Cannot resolve "to_csv(value)" 
due to data type mismatch: The input of `to_csv` can't be "STRUCT>" type data. SQLSTATE: 42K09;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46654][SQL][Python] Make `to_csv` explicitly indicate that it does not support some types of data [spark]

2024-03-11 Thread via GitHub


panbingkun commented on code in PR #44665:
URL: https://github.com/apache/spark/pull/44665#discussion_r1519273344


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala:
##
@@ -260,16 +263,33 @@ case class StructsToCsv(
   child = child,
   timeZoneId = None)
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+child.dataType match {
+  case schema: StructType if schema.map(_.dataType).forall(
+dt => isSupportedDataType(dt)) => TypeCheckSuccess
+  case _ => DataTypeMismatch(
+errorSubClass = "UNSUPPORTED_INPUT_TYPE",

Review Comment:
   Using `UNSUPPORTED_INPUT_TYPE` instead of `UNEXPECTED_INPUT_TYPE` here will 
make the prompt information more `clear` to the user, 
   
   for example:
   ```
   val rows = new java.util.ArrayList[Row]()
   rows.add(Row(1L, Row(2L, "Alice",
 Map("math" -> 100L, "english" -> 200L, "science" -> null
   
   val valueSchema = StructType(Seq(
 StructField("age", LongType),
 StructField("name", StringType),
 StructField("scores", MapType(StringType, LongType
   val schema = StructType(Seq(
 StructField("key", LongType),
 StructField("value", valueSchema)))
   
   val df = spark.createDataFrame(rows, schema)
   
   df.select(to_csv($"value")).collect()
   ```
   
   Before(Last review):
   Users may feel confused about prompt: `requires the "STRUCT" type, however 
"value" has the type "STRUCT...`
   ```
   [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "to_csv(value)" due 
to data type mismatch: The 1 parameter requires the "STRUCT" type, however 
"value" has the type "STRUCT>". SQLSTATE: 42K09;
   ```
   
   After(Right now):
   ```
   [DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE] Cannot resolve "to_csv(value)" 
due to data type mismatch: The input of `to_csv` can't be "STRUCT>" type data. SQLSTATE: 42K09;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-47338][SQL] Introduce `_LEGACY_ERROR_UNKNOWN` for default error class [spark]

2024-03-11 Thread via GitHub


itholic opened a new pull request, #45457:
URL: https://github.com/apache/spark/pull/45457

   
   
   ### What changes were proposed in this pull request?
   
   This PR proposes to introduce `_LEGACY_ERROR_UNKNOWN` for default error 
class when error class is not defined.
   
   ### Why are the changes needed?
   
   In Spark, when an `errorClass` is not explicitly defined for an exception, 
the method `getErrorClass` returns null so far.
   
   This behavior can lead to ambiguity and makes debugging more challenging by 
not providing a clear indication that the error class was not set.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No API changes, but the user-facing error message will contain 
`_LEGACY_ERROR_UNKNOWN` when error class is not specified.
   
   ### How was this patch tested?
   
   The existing CI should pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-47194][BUILD] Upgrade log4j to 2.23.1 [spark]

2024-03-11 Thread via GitHub


panbingkun commented on PR #45326:
URL: https://github.com/apache/spark/pull/45326#issuecomment-1987848933

   @LuciferYang @dongjoon-hyun 
   
   The version `2.23.1` of log4j2 has been released, which has resolved the 
issue with the `StatusLogger` in the version `1.13.0`, as follows:
   https://github.com/apache/spark/assets/15246973/c25a71b0-1862-45b5-8d08-194b0343eb64";>
   
   Let it run through GA first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519302618


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -410,7 +414,20 @@ public boolean endsWith(final UTF8String suffix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().endsWith(suffix.toLowerCase());
 }
-return matchAt(suffix, numBytes - suffix.numBytes, collationId);
+return collatedEndsWith(suffix, collationId);
+  }
+
+  private boolean collatedEndsWith(final UTF8String suffix, int collationId) {
+if (suffix.numBytes == 0 || this.numBytes == 0) {
+  return suffix.numBytes == 0;
+}
+if (suffix.numChars() > this.numChars()) {
+  return false;
+}
+return CollationFactory.getStringSearch(
+  this.substring(this.numChars()-suffix.numChars(), this.numChars()),

Review Comment:
   `this.numChars() - suffix.numChars()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519302246


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +389,18 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+return collatedStartsWith(prefix, collationId);
+  }
+
+  private boolean collatedStartsWith(final UTF8String prefix, int collationId) 
{
+if (prefix.numBytes == 0 || this.numBytes == 0) {
+  return prefix.numBytes==0;

Review Comment:
   `return prefix.numBytes == 0;`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45827][SQL] Add variant singleton type for Java [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45455:
URL: https://github.com/apache/spark/pull/45455#issuecomment-1987868783

   The test failure is not related to the changes: [Run / Run Docker 
integration 
tests](https://github.com/richardc-db/spark/actions/runs/8228264424/job/22497420892#logs)
   
   +1, LGTM. Merging to master.
   Thank you, @richardc-db and @HyukjinKwon for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45827][SQL] Add variant singleton type for Java [spark]

2024-03-11 Thread via GitHub


MaxGekk closed pull request #45455: [SPARK-45827][SQL] Add variant singleton 
type for Java
URL: https://github.com/apache/spark/pull/45455


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [MINOR][DOCS][SQL] Followup to fix doc comment for coalescePartitions.parallelismFirst [spark]

2024-03-11 Thread via GitHub


eejbyfeldt opened a new pull request, #45458:
URL: https://github.com/apache/spark/pull/45458

   We missed that this doc is duplicated while fixing it in 
https://github.com/apache/spark/pull/45437
   
   
   
   ### What changes were proposed in this pull request?
   
   Documentation fix.
   
   
   ### Why are the changes needed?
   
   Current doc is wrong.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   N/A
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS][SQL] Fix doc comment for coalescePartitions.parallelismFirst [spark]

2024-03-11 Thread via GitHub


eejbyfeldt commented on PR #45437:
URL: https://github.com/apache/spark/pull/45437#issuecomment-1987872112

   Realised that this doc comment is actually in 2 places: 
https://github.com/apache/spark/pull/45458


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45245][PYTHON][CONNECT] PythonWorkerFactory: Timeout if worker does not connect back. [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #43023:
URL: https://github.com/apache/spark/pull/43023#discussion_r1519318552


##
core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.net.SocketTimeoutException
+
+// scalastyle:off executioncontextglobal
+import scala.concurrent.ExecutionContext.Implicits.global
+// scalastyle:on executioncontextglobal
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.scalatest.matchers.must.Matchers
+
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.SparkException
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.ThreadUtils
+
+// Tests for PythonWorkerFactory.
+class PythonWorkerFactorySuite extends SparkFunSuite with Matchers with 
SharedSparkContext {

Review Comment:
   Does this suite have to extend `Matchers`? @HyukjinKwon can you double check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


stefankandic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519323364


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {

Review Comment:
   i thought about this for a bit and we should probably improve this logic so 
that the explicit collation has the utmost precedence, ie if there is a one 
input with explicit and another with indeterminate the output should be 
explicit as well
   
   the precedence hierarchy should probably go like this:
   1. no collation
   2. implicit collation
   3. indeterminate collation
   4. explicit collation
   
   what do you think about this approach? It seems to me that this is the pgsql 
behaviour as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


stefankandic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519324890


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>
+  val dataTypes = exprs.map(_.dataType.asInstanceOf[StringType])
+
+  if (hasIndeterminate(dataTypes)) {
+if (failOnIndeterminate) {

Review Comment:
   also a sidenote about failOnIndeterminate; currently if will result 
sometimes in analysis and sometimes in runtime. Is this okay and should we 
maybe only fail in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519347155


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -438,6 +439,19 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
 }
   }
 
+  test("test concurrently generating collation keys") {

Review Comment:
   Can this test go to `CollationFactorySuite`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1519254087


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.

Review Comment:
   Say `str, int, bool`, as these are actual representation of "python" type? 
   https://docs.python.org/3/library/json.html#json.dump
   
   Also do we disallow other types or they are just discouraged to use (for us) 
so we don't document? There are more types JSON module in the python standard 
library supports.
   https://docs.python.org/3/library/json.html#json.JSONDecoder



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,133 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes
+Integer, String and Boolean.
+
+Examples
+
+>>> def initialOffset(self):
+... return {"parititon-1": {"index": 3, "closed": True}, 
"partition-2": {"index": 5}}
+"""
+
+...
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={"feature": "initialOffset"},
+)
+
+def latestOffset(self) -> dict:
+"""
+Returns the most recent offset available.
+
+Returns
+---
+dict
+A dict or recursive dict whose key and value are primitive types, 
which includes

Review Comment:
   ditto



##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"offset": {"partition-1": 0}}
+  |def latestOffset(self):
+  |return {"offset": {"partition-1": 2}}
+  |def partitions(self, start: dict, end: dict):
+  |

Re: [PR] [WIP][SPARK-47169][SQL] Disable bucketing on collated columns [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45260:
URL: https://github.com/apache/spark/pull/45260#discussion_r1519359745


##
common/utils/src/main/resources/error/error-classes.json:
##
@@ -1752,6 +1752,12 @@
 },
 "sqlState" : "22003"
   },
+  "INVALID_BUCKET_COLUMN_DATA_TYPE" : {
+"message" : [
+  "Cannot use  for bucket column."

Review Comment:
   Changed. Is this sufficient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


stefankandic commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519361380


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -138,11 +138,13 @@ public Collation(
 collationTable[2] = new Collation(
   "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
 collationTable[2].collator.setStrength(Collator.TERTIARY);
+collationTable[2].collator.freeze();

Review Comment:
   @cloud-fan collator uses a buffer while writing collation keys, freezing it 
makes this operation safe by using a reentrant lock around it 
([source](https://github.com/unicode-org/icu/blob/main/icu4j/main/collate/src/main/java/com/ibm/icu/text/RuleBasedCollator.java#L1831))
   
   this of course raises performance issues which we should probably discuss, 
because now only one thread can generate sort keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


stefankandic commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519362819


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -438,6 +439,19 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
 }
   }
 
+  test("test concurrently generating collation keys") {

Review Comment:
   it could but I decided to put it here because it would require adding a new 
dependency for parallel collections which I'd like to avoid



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519361693


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -378,13 +378,6 @@ public boolean matchAt(final UTF8String s, int pos) {
 return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, 
s.numBytes);
   }
 
-  private boolean matchAt(final UTF8String s, int pos, int collationId) {
-if (s.numBytes + pos > numBytes || pos < 0) {
-  return false;
-}
-return this.substring(pos, pos + s.numBytes).semanticCompare(s, 
collationId) == 0;

Review Comment:
   I wonder can't we just use `CollationFactory.getStringSearch` here?



##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +389,18 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+return collatedStartsWith(prefix, collationId);
+  }
+
+  private boolean collatedStartsWith(final UTF8String prefix, int collationId) 
{

Review Comment:
   Could you point out which unit tests in `UTF8StringSuite` check the 
functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


stefankandic commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519361380


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -138,11 +138,13 @@ public Collation(
 collationTable[2] = new Collation(
   "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
 collationTable[2].collator.setStrength(Collator.TERTIARY);
+collationTable[2].collator.freeze();

Review Comment:
   @cloud-fan collator uses a buffer while writing collation keys, freezing it 
makes this operation safe by using a reentrant lock around it 
([source](https://github.com/unicode-org/icu/blob/main/icu4j/main/collate/src/main/java/com/ibm/icu/text/RuleBasedCollator.java#L1831))
   
   this of course raises performance issues which we should probably discuss, 
because now we can't generate sort keys in parallel on a single collator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519379776


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +389,18 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+return collatedStartsWith(prefix, collationId);
+  }
+
+  private boolean collatedStartsWith(final UTF8String prefix, int collationId) 
{

Review Comment:
   I think we test these functions only in CollationSuite now, as they are only 
used in context of collations anyways. To remove clutter, we opted to remove 
unit tests from `UTF8StringSuite` and `StringExpressionSuite` because those in 
`CollationSuite` seem to do just fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46043][SQL][FOLLOWUP] do not resolve v2 table provider with custom session catalog [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45440:
URL: https://github.com/apache/spark/pull/45440#discussion_r1517821667


##
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala:
##
@@ -379,10 +379,6 @@ private[sql] object CatalogV2Util {
 }
   }
 
-  def loadRelation(catalog: CatalogPlugin, ident: Identifier): 
Option[NamedRelation] = {

Review Comment:
   This is a test-only function. I found it as I was searching all the call 
sites of `loadTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519387286


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -378,13 +378,6 @@ public boolean matchAt(final UTF8String s, int pos) {
 return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, 
s.numBytes);
   }
 
-  private boolean matchAt(final UTF8String s, int pos, int collationId) {
-if (s.numBytes + pos > numBytes || pos < 0) {
-  return false;
-}
-return this.substring(pos, pos + s.numBytes).semanticCompare(s, 
collationId) == 0;

Review Comment:
   We can, that's exactly what Stevo is doing - removing this code and 
replacing it with `CollationFactory.getStringSearch`. For more context: the old 
implementation was there before we introduced `StringSearch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


uros-db commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519387286


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -378,13 +378,6 @@ public boolean matchAt(final UTF8String s, int pos) {
 return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, 
s.numBytes);
   }
 
-  private boolean matchAt(final UTF8String s, int pos, int collationId) {
-if (s.numBytes + pos > numBytes || pos < 0) {
-  return false;
-}
-return this.substring(pos, pos + s.numBytes).semanticCompare(s, 
collationId) == 0;

Review Comment:
   We can, that's exactly what Stevo is doing - removing this code and 
replacing it with `CollationFactory.getStringSearch`. For more context: the old 
implementation was there before we introduced `StringSearch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519395144


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -138,11 +138,13 @@ public Collation(
 collationTable[2] = new Collation(
   "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
 collationTable[2].collator.setStrength(Collator.TERTIARY);
+collationTable[2].collator.freeze();

Review Comment:
   they should use one buffer per thread... Anyway this is out of our control 
and calling `freeze` LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-45245][CONNECT][TESTS][FOLLOW-UP] Remove unneeded Matchers trait in the test [spark]

2024-03-11 Thread via GitHub


HyukjinKwon opened a new pull request, #45459:
URL: https://github.com/apache/spark/pull/45459

   ### What changes were proposed in this pull request?
   
   This PR is a followup of https://github.com/apache/spark/pull/43023 that 
addresses a post-review comment.
   
   ### Why are the changes needed?
   
   It is unnecessary. It also matters with Scala compatibility so should better 
remove if unused.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, test-only.
   
   ### How was this patch tested?
   
   Manually.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45245][PYTHON][CONNECT] PythonWorkerFactory: Timeout if worker does not connect back. [spark]

2024-03-11 Thread via GitHub


HyukjinKwon commented on code in PR #43023:
URL: https://github.com/apache/spark/pull/43023#discussion_r1519396300


##
core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.net.SocketTimeoutException
+
+// scalastyle:off executioncontextglobal
+import scala.concurrent.ExecutionContext.Implicits.global
+// scalastyle:on executioncontextglobal
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.scalatest.matchers.must.Matchers
+
+import org.apache.spark.SharedSparkContext
+import org.apache.spark.SparkException
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.ThreadUtils
+
+// Tests for PythonWorkerFactory.
+class PythonWorkerFactorySuite extends SparkFunSuite with Matchers with 
SharedSparkContext {

Review Comment:
   Thanks. Made a followup PR: https://github.com/apache/spark/pull/45459



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45245][CONNECT][TESTS][FOLLOW-UP] Remove unneeded Matchers trait in the test [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45459:
URL: https://github.com/apache/spark/pull/45459#discussion_r1519406352


##
core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala:
##
@@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.util.ThreadUtils
 
 // Tests for PythonWorkerFactory.
-class PythonWorkerFactorySuite extends SparkFunSuite with Matchers with 
SharedSparkContext {

Review Comment:
   need to remove the import as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47327][SQL] Fix thread safety issue in ICU Collator [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45436:
URL: https://github.com/apache/spark/pull/45436#discussion_r1519437339


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -138,11 +138,13 @@ public Collation(
 collationTable[2] = new Collation(
   "UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true);
 collationTable[2].collator.setStrength(Collator.TERTIARY);
+collationTable[2].collator.freeze();

Review Comment:
   Yeah, as soon as we get benchmarks working we should revisit this decision.
   One option that we also prototyped is to keep `Collator` in `ThreadLocal` 
fields, which also solved the problem. But `freeze` is a bit cleaner and we 
don't have microbenchmarks yet so we can't make data driven decision at this 
point.
   LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519451854


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +389,18 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+return collatedStartsWith(prefix, collationId);
+  }
+
+  private boolean collatedStartsWith(final UTF8String prefix, int collationId) 
{

Review Comment:
   I think that @MaxGekk has a point.
   `CollationSuite` becomes a bit over cluttered and it should be used for for 
E2E testing. Can we either add unit test to `UTF8StringSuite` or create 
`UTF8StringSuiteWithCollation` suite?
   
   Option is also to use `CollationExpressionSuite`.
   
   I would propose following:
   1) E2E collation tests go to `CollationSuite`.
   2) String level tests go to either `UTF8StringSuite` or 
`UTF8StringWithCollationSuite`.
   3) Expression level tests go to `CollationExpressionSuite`.
   4) Collation management tests to to `CollationFactorySuite`.
   
   @MaxGekk and @cloud-fan - what are your thought on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS][SQL] Fix doc comment for coalescePartitions.parallelismFirst on sql-performance-tuning page [spark]

2024-03-11 Thread via GitHub


yaooqinn closed pull request #45458:  [MINOR][DOCS][SQL] Fix doc comment for 
coalescePartitions.parallelismFirst on sql-performance-tuning page
URL: https://github.com/apache/spark/pull/45458


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS][SQL] Fix doc comment for coalescePartitions.parallelismFirst on sql-performance-tuning page [spark]

2024-03-11 Thread via GitHub


yaooqinn commented on PR #45458:
URL: https://github.com/apache/spark/pull/45458#issuecomment-1988054085

   Thank you, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519546855


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -396,7 +389,18 @@ public boolean startsWith(final UTF8String prefix, int 
collationId) {
 if (collationId == CollationFactory.LOWERCASE_COLLATION_ID) {
   return this.toLowerCase().startsWith(prefix.toLowerCase());
 }
-return matchAt(prefix, 0, collationId);
+return collatedStartsWith(prefix, collationId);
+  }
+
+  private boolean collatedStartsWith(final UTF8String prefix, int collationId) 
{

Review Comment:
   SGTM, and +1 to `UTF8StringWithCollationSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519561653


##
sql/api/src/main/scala/org/apache/spark/sql/types/StringType.scala:
##
@@ -41,14 +41,22 @@ class StringType private(val collationId: Int) extends 
AtomicType with Serializa
*/
   def isBinaryCollation: Boolean = 
CollationFactory.fetchCollation(collationId).isBinaryCollation
 
+  /**
+   * Returns whether the collation is indeterminate. An indeterminate 
collation is
+   * a result of combination of conflicting non-default implicit collations.
+   */
+  def isIndeterminateCollation: Boolean = collationId == 
CollationFactory.INDETERMINATE_COLLATION_ID
+
   /**
* Type name that is shown to the customer.
* If this is an UCS_BASIC collation output is `string` due to backwards 
compatibility.
*/
   override def typeName: String =
 if (isDefaultCollation) "string"
+else if (isIndeterminateCollation) s"string COLLATE 
INDETERMINATE_COLLATION"
 else s"string COLLATE 
${CollationFactory.fetchCollation(collationId).collationName}"

Review Comment:
   This change will require changes on PySpark side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519567374


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -138,21 +140,31 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   @scala.annotation.tailrec
   private def findWiderTypeForString(dt1: DataType, dt2: DataType): 
Option[DataType] = {
 (dt1, dt2) match {
-  case (StringType, _: IntegralType) => Some(LongType)
-  case (StringType, _: FractionalType) => Some(DoubleType)
-  case (StringType, NullType) => Some(StringType)
+  case (_: StringType, _: IntegralType) => Some(LongType)
+  case (_: StringType, _: FractionalType) => Some(DoubleType)
+  case (st: StringType, NullType) => Some(st)
   // If a binary operation contains interval type and string, we can't 
decide which
   // interval type the string should be promoted as. There are many 
possible interval
   // types, such as year interval, month interval, day interval, hour 
interval, etc.
-  case (StringType, _: AnsiIntervalType) => None
-  case (StringType, a: AtomicType) => Some(a)
-  case (other, StringType) if other != StringType => 
findWiderTypeForString(StringType, other)
+  case (_: StringType, _: AnsiIntervalType) => None
+  case (_: StringType, a: AtomicType) => Some(a)
+  case (other, st: StringType) if !other.isInstanceOf[StringType] =>
+findWiderTypeForString(st, other)
   case _ => None
 }
   }
 
-  override def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
-types.foldLeft[Option[DataType]](Some(NullType))((r, c) =>
+  override def findWiderCommonType(exprs: Seq[Expression],
+   failOnIndeterminate: Boolean = false): 
Option[DataType] = {
+(if (exprs.map(_.dataType).partition(hasStringType)._1.distinct.size > 1) {

Review Comment:
   If I am reading this correctly you are just trying to figure out whether 
there is a expr with `StringType`? Can't you just say 
`exprs.exists(_.dataType.HasStringType)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on PR #45373:
URL: https://github.com/apache/spark/pull/45373#issuecomment-1988232237

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally [spark]

2024-03-11 Thread via GitHub


cloud-fan closed pull request #45373: [SPARK-47270][SQL] Dataset.isEmpty 
projects CommandResults locally
URL: https://github.com/apache/spark/pull/45373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519574637


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -138,21 +140,31 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   @scala.annotation.tailrec
   private def findWiderTypeForString(dt1: DataType, dt2: DataType): 
Option[DataType] = {
 (dt1, dt2) match {
-  case (StringType, _: IntegralType) => Some(LongType)
-  case (StringType, _: FractionalType) => Some(DoubleType)
-  case (StringType, NullType) => Some(StringType)
+  case (_: StringType, _: IntegralType) => Some(LongType)
+  case (_: StringType, _: FractionalType) => Some(DoubleType)
+  case (st: StringType, NullType) => Some(st)
   // If a binary operation contains interval type and string, we can't 
decide which
   // interval type the string should be promoted as. There are many 
possible interval
   // types, such as year interval, month interval, day interval, hour 
interval, etc.
-  case (StringType, _: AnsiIntervalType) => None
-  case (StringType, a: AtomicType) => Some(a)
-  case (other, StringType) if other != StringType => 
findWiderTypeForString(StringType, other)
+  case (_: StringType, _: AnsiIntervalType) => None
+  case (_: StringType, a: AtomicType) => Some(a)
+  case (other, st: StringType) if !other.isInstanceOf[StringType] =>
+findWiderTypeForString(st, other)
   case _ => None
 }
   }
 
-  override def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
-types.foldLeft[Option[DataType]](Some(NullType))((r, c) =>
+  override def findWiderCommonType(exprs: Seq[Expression],
+   failOnIndeterminate: Boolean = false): 
Option[DataType] = {
+(if (exprs.map(_.dataType).partition(hasStringType)._1.distinct.size > 1) {
+  val collationId = CollationTypeCasts.getOutputCollation(exprs, 
failOnIndeterminate)
+  exprs.map(e =>
+if (hasStringType(e.dataType)) {
+  castStringType(e.dataType, collationId)
+  e
+}
+else e)
+} else 
exprs).map(_.dataType).foldLeft[Option[DataType]](Some(NullType))((r, c) =>
   r match {
 case Some(d) => findWiderTypeForTwo(d, c)

Review Comment:
   I find this pretty weird.
   Why can't we just rely on `fold` + `findWiderTypeForTwo` logic? I think that 
type checks should remain `foldable` even with collation concept? i.e. we 
should always be able to determine output collation by just comparing two 
expressions and eventually folding to the result?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519576239


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -173,6 +185,8 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   inType: DataType,
   expectedType: AbstractDataType): Option[DataType] = {
 (inType, expectedType) match {
+  case (_: StringType, st: StringType) =>
+Some(st)

Review Comment:
   Can you use explicit `isDefaultCollation`? In general, let's stay away of 
using `StringType` case object. We left it due to backwards compatability, but 
we should use explicit `st: StringType if st.isDefaultCollation` checks in 
Spark code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1519578742


##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +897,138 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+  withTable("bucketed_table1", "bucketed_table2", "bucketed_table3") {
+val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", 
"j", "k")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table1")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table2")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table3")
+
+val warehouseFilePath = new 
URI(spark.sessionState.conf.warehousePath).getPath
+val tableDir = new File(warehouseFilePath, "bucketed_table2")
+Utils.deleteRecursively(tableDir)
+df.write.parquet(tableDir.getAbsolutePath)

Review Comment:
   What does this do? Remove the data then write the same data again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519579806


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1

Review Comment:
   Let's again be explicit here and check whether collation ids are different.
   In future we may want to extend `StringType` with other fields (e.g. max 
lenght) and for such strings this rule shouldn't kick in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47148][SQL] Avoid to materialize AQE ExchangeQueryStageExec on the cancellation [spark]

2024-03-11 Thread via GitHub


cloud-fan commented on code in PR #45234:
URL: https://github.com/apache/spark/pull/45234#discussion_r1519582719


##
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##
@@ -897,6 +897,138 @@ class AdaptiveQueryExecSuite
 }
   }
 
+  test("SPARK-47148: AQE should avoid to materialize ShuffleQueryStage on the 
cancellation") {
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+  withTable("bucketed_table1", "bucketed_table2", "bucketed_table3") {
+val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", 
"j", "k")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table1")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table2")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table3")
+
+val warehouseFilePath = new 
URI(spark.sessionState.conf.warehousePath).getPath
+val tableDir = new File(warehouseFilePath, "bucketed_table2")
+Utils.deleteRecursively(tableDir)
+df.write.parquet(tableDir.getAbsolutePath)
+
+val aggDF1 = spark.table("bucketed_table1").groupBy("i", "j", 
"k").count().repartition(5)
+val aggDF2 = spark.table("bucketed_table2").groupBy("i", 
"j").count().repartition(5)
+val aggDF3 = 
spark.table("bucketed_table3").groupBy("i").count().repartition(5)
+val joinedDF = aggDF1.join(aggDF2, Seq("i", "j")).join(aggDF3, 
Seq("i"))
+
+val error = intercept[Exception] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "Invalid bucket file")
+assert(error.getSuppressed.size === 0)
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+
+// There should not be BroadcastQueryStageExec
+val broadcastQueryStageExecs = collect(adaptivePlan) {
+  case r: BroadcastQueryStageExec => r
+}
+assert(broadcastQueryStageExecs.isEmpty, s"$adaptivePlan")
+
+// All QueryStages should be based on ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case r: ShuffleQueryStageExec => r
+}
+assert(shuffleQueryStageExecs.length == 3, s"$adaptivePlan")
+// First ShuffleQueryStage is materialized so it needs to be canceled.
+assert(shuffleQueryStageExecs(0).isMaterializationStarted(),
+  "Materialization should be started.")
+// Second ShuffleQueryStage materialization is failed so
+// it is excluded from the cancellation due to earlyFailedStage.
+assert(shuffleQueryStageExecs(1).isMaterializationStarted(),
+  "Materialization should be started but it is failed.")
+// Last ShuffleQueryStage is not materialized yet so it does not 
require
+// to be canceled and it is just skipped from the cancellation.
+assert(!shuffleQueryStageExecs(2).isMaterializationStarted(),
+  "Materialization should not be started.")
+  }
+}
+  }
+
+  test("SPARK-47148: Check if BroadcastQueryStage materialization is started") 
{
+withSQLConf(
+  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+  SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
+  withTable("bucketed_table1", "bucketed_table2", "bucketed_table3") {
+val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", 
"j", "k")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table1")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table2")
+df.write.format("parquet").bucketBy(8, 
"i").saveAsTable("bucketed_table3")
+
+val warehouseFilePath = new 
URI(spark.sessionState.conf.warehousePath).getPath
+val tableDir = new File(warehouseFilePath, "bucketed_table2")
+Utils.deleteRecursively(tableDir)
+df.write.parquet(tableDir.getAbsolutePath)
+
+val aggDF1 = spark.table("bucketed_table1").groupBy("i", "j", 
"k").count()
+val aggDF2 = spark.table("bucketed_table2").groupBy("i", "j", 
"k").count()
+val aggDF3 = spark.table("bucketed_table3").groupBy("i", "j").count()
+val joinedDF = aggDF1.join(aggDF2, Seq("i", "j", "k")).join(aggDF3, 
Seq("i", "j"))
+  .join(aggDF1, Seq("i"))
+
+val error = intercept[Exception] {
+  joinedDF.collect()
+}
+assert(error.getMessage() contains "Invalid bucket file")
+assert(error.getSuppressed.size === 0)
+
+val adaptivePlan = 
joinedDF.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
+// There should not be ShuffleQueryStageExec
+val shuffleQueryStageExecs = collect(adaptivePlan) {
+  case r: ShuffleQueryStageExec => 

Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519581505


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {

Review Comment:
   I would like to see tests for this. Can you add them to `TypeCoercionSuite`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519587302


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct

Review Comment:
   Again, check collation id explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519589124


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>

Review Comment:
   Maybe it would be clearer just to write `case 0` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519594237


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>
+  val dataTypes = exprs.map(_.dataType.asInstanceOf[StringType])
+
+  if (hasIndeterminate(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.indeterminateCollationError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else if (hasMultipleImplicits(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.implicitCollationMismatchError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else {
+dataTypes.find(!_.isDefaultCollation)
+  .getOrElse(StringType)
+  .collationId

Review Comment:
   I don't get this. This is the case where:
   1) We don't have inteterminate.
   2) Don'tr have multiple implicits.
   
   So, what we do is find first non default collation (?) or map to the default 
one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519598303


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>
+  val dataTypes = exprs.map(_.dataType.asInstanceOf[StringType])
+
+  if (hasIndeterminate(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.indeterminateCollationError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else if (hasMultipleImplicits(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.implicitCollationMismatchError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else {
+dataTypes.find(!_.isDefaultCollation)
+  .getOrElse(StringType)
+  .collationId
+  }
+  }
+}
+
+private def hasIndeterminate(dataTypes: Seq[StringType]): Boolean =
+  dataTypes.exists(_.isIndeterminateCollation)
+
+
+private def hasMultipleImplicits(dataTypes: Seq[StringType]): Boolean =
+  dataTypes.filter(!_.isDefaultCollation).distinct.size > 1

Review Comment:
   Hm, is `isDefaultCollation` sufficient check to determine whether this is a 
non-implicit collation?
   Can you also add a comment describint what is definition of implicit and 
explicit collations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings [spark]

2024-03-11 Thread via GitHub


yaooqinn commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1988293451

   Do we need to revise unbase64 accordingly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47337][SQL][DOCKER] Upgrade DB2 docker image version to 11.5.8.0 [spark]

2024-03-11 Thread via GitHub


yaooqinn closed pull request #45456: [SPARK-47337][SQL][DOCKER] Upgrade DB2 
docker image version to 11.5.8.0
URL: https://github.com/apache/spark/pull/45456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47337][SQL][DOCKER] Upgrade DB2 docker image version to 11.5.8.0 [spark]

2024-03-11 Thread via GitHub


yaooqinn commented on PR #45456:
URL: https://github.com/apache/spark/pull/45456#issuecomment-1988294658

   Thank you  @LuciferYang, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47328][SQL] Rename UCS_BASIC collation to UTF8_BINARY [spark]

2024-03-11 Thread via GitHub


MaxGekk closed pull request #45442: [SPARK-47328][SQL] Rename UCS_BASIC 
collation to UTF8_BINARY
URL: https://github.com/apache/spark/pull/45442


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47328][SQL] Rename UCS_BASIC collation to UTF8_BINARY [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45442:
URL: https://github.com/apache/spark/pull/45442#issuecomment-1988350585

   +1, LGTM. Merging to master.
   Thank you, @stefankandic and @srielau for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings [spark]

2024-03-11 Thread via GitHub


ted-jenks commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1988371956

   > Do we need to revise unbase64 accordingly?
   
   Unbase64 uses the Mime decoder, which can tolerate chunked and unchunked 
data.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings [spark]

2024-03-11 Thread via GitHub


yaooqinn commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1988397848

   thank you for the explanation @ted-jenks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519714112


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -138,21 +140,31 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   @scala.annotation.tailrec
   private def findWiderTypeForString(dt1: DataType, dt2: DataType): 
Option[DataType] = {
 (dt1, dt2) match {
-  case (StringType, _: IntegralType) => Some(LongType)
-  case (StringType, _: FractionalType) => Some(DoubleType)
-  case (StringType, NullType) => Some(StringType)
+  case (_: StringType, _: IntegralType) => Some(LongType)
+  case (_: StringType, _: FractionalType) => Some(DoubleType)
+  case (st: StringType, NullType) => Some(st)
   // If a binary operation contains interval type and string, we can't 
decide which
   // interval type the string should be promoted as. There are many 
possible interval
   // types, such as year interval, month interval, day interval, hour 
interval, etc.
-  case (StringType, _: AnsiIntervalType) => None
-  case (StringType, a: AtomicType) => Some(a)
-  case (other, StringType) if other != StringType => 
findWiderTypeForString(StringType, other)
+  case (_: StringType, _: AnsiIntervalType) => None
+  case (_: StringType, a: AtomicType) => Some(a)
+  case (other, st: StringType) if !other.isInstanceOf[StringType] =>
+findWiderTypeForString(st, other)
   case _ => None
 }
   }
 
-  override def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
-types.foldLeft[Option[DataType]](Some(NullType))((r, c) =>
+  override def findWiderCommonType(exprs: Seq[Expression],
+   failOnIndeterminate: Boolean = false): 
Option[DataType] = {
+(if (exprs.map(_.dataType).partition(hasStringType)._1.distinct.size > 1) {

Review Comment:
   I am trying to see if there are at least 2 different StringTypes. In that 
case we need to do casting of inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


dbatomic commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519730371


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -138,21 +140,31 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   @scala.annotation.tailrec
   private def findWiderTypeForString(dt1: DataType, dt2: DataType): 
Option[DataType] = {
 (dt1, dt2) match {
-  case (StringType, _: IntegralType) => Some(LongType)
-  case (StringType, _: FractionalType) => Some(DoubleType)
-  case (StringType, NullType) => Some(StringType)
+  case (_: StringType, _: IntegralType) => Some(LongType)
+  case (_: StringType, _: FractionalType) => Some(DoubleType)
+  case (st: StringType, NullType) => Some(st)
   // If a binary operation contains interval type and string, we can't 
decide which
   // interval type the string should be promoted as. There are many 
possible interval
   // types, such as year interval, month interval, day interval, hour 
interval, etc.
-  case (StringType, _: AnsiIntervalType) => None
-  case (StringType, a: AtomicType) => Some(a)
-  case (other, StringType) if other != StringType => 
findWiderTypeForString(StringType, other)
+  case (_: StringType, _: AnsiIntervalType) => None
+  case (_: StringType, a: AtomicType) => Some(a)
+  case (other, st: StringType) if !other.isInstanceOf[StringType] =>
+findWiderTypeForString(st, other)
   case _ => None
 }
   }
 
-  override def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
-types.foldLeft[Option[DataType]](Some(NullType))((r, c) =>
+  override def findWiderCommonType(exprs: Seq[Expression],
+   failOnIndeterminate: Boolean = false): 
Option[DataType] = {
+(if (exprs.map(_.dataType).partition(hasStringType)._1.distinct.size > 1) {

Review Comment:
   I see. Still, I think partition + select first in tuple is pretty weird way 
to filter?
   Shouldn't `exprs.map(_.dataType).filter(hasStringType).distinct.size ` be 
more performant and readable?
   
   All of that + comment that you shouldn't be using `distinct` against 
`StringType` for checking collation equivalence since `StringType` may in 
future hold other properties. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-47341][Connect] Replace commands with relations in a few tests in SparkConnectClientSuite [spark]

2024-03-11 Thread via GitHub


xi-db opened a new pull request, #45460:
URL: https://github.com/apache/spark/pull/45460

   ### What changes were proposed in this pull request?
   
   
   A few 
[tests](https://github.com/apache/spark/blob/master/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala#L481-L527)
 in SparkConnectClientSuite attempt to test the result collection of a 
reattachable execution through the use of a SQL command. The SQL command, on a 
real server, is not executed eagerly (since it is a select command) and thus, 
is not entirely accurate. The test itself is non-problematic since a dummy 
server with dummy responses is used but a small improvement here would be to 
construct a relation rather than a command.
   
   ### Why are the changes needed?
   
   Although these tests are not problematic, we should ensure that the behavior 
of the tests is consistent with the actual behavior of real servers to avoid 
misleading developers.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   The tests in org.apache.spark.sql.connect.client.SparkConnectClientSuite 
passed.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519752158


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>
+  val dataTypes = exprs.map(_.dataType.asInstanceOf[StringType])
+
+  if (hasIndeterminate(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.indeterminateCollationError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else if (hasMultipleImplicits(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.implicitCollationMismatchError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else {
+dataTypes.find(!_.isDefaultCollation)
+  .getOrElse(StringType)
+  .collationId

Review Comment:
   That is right, if we get to this point, we have only one implicit collation. 
In case we have 0 implicit non-default collations, we get to default collation 
and we return id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519754938


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala:
##
@@ -764,6 +773,94 @@ abstract class TypeCoercionBase {
 }
   }
 
+  object CollationTypeCasts extends TypeCoercionRule {
+override def transform: PartialFunction[Expression, Expression] = {
+  case e if !e.childrenResolved => e
+
+  case b @ BinaryComparison(left, right) if shouldCast(Seq(left.dataType, 
right.dataType)) =>
+val newChildren = collateToSingleType(Seq(left, right))
+b.withNewChildren(newChildren)
+}
+
+def shouldCast(types: Seq[DataType]): Boolean = {
+  types.forall(_.isInstanceOf[StringType]) && types.distinct.length > 1
+}
+
+/**
+ *  Collates the input expressions to a single collation.
+ */
+def collateToSingleType(exprs: Seq[Expression]): Seq[Expression] = {
+  val collationId = getOutputCollation(exprs)
+
+  exprs.map { expression =>
+expression.dataType match {
+  case st: StringType if st.collationId == collationId =>
+expression
+  case _: StringType =>
+Cast(expression, StringType(collationId))
+}
+  }
+}
+
+/**
+ * Based on the data types of the input expressions this method determines
+ * a collation type which the output will have.
+ */
+def getOutputCollation(exprs: Seq[Expression], failOnIndeterminate: 
Boolean = true): Int = {
+  val explicitTypes = 
exprs.filter(hasExplicitCollation).map(_.dataType).distinct
+
+  explicitTypes.size match {
+case 1 => explicitTypes.head.asInstanceOf[StringType].collationId
+case size if size > 1 =>
+  throw QueryCompilationErrors
+.explicitCollationMismatchError(
+  explicitTypes.map(t => t.asInstanceOf[StringType].typeName)
+)
+case _ =>
+  val dataTypes = exprs.map(_.dataType.asInstanceOf[StringType])
+
+  if (hasIndeterminate(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.indeterminateCollationError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else if (hasMultipleImplicits(dataTypes)) {
+if (failOnIndeterminate) {
+  throw QueryCompilationErrors.implicitCollationMismatchError()
+} else {
+  CollationFactory.INDETERMINATE_COLLATION_ID
+}
+  }
+  else {
+dataTypes.find(!_.isDefaultCollation)
+  .getOrElse(StringType)
+  .collationId
+  }
+  }
+}
+
+private def hasIndeterminate(dataTypes: Seq[StringType]): Boolean =
+  dataTypes.exists(_.isIndeterminateCollation)
+
+
+private def hasMultipleImplicits(dataTypes: Seq[StringType]): Boolean =
+  dataTypes.filter(!_.isDefaultCollation).distinct.size > 1

Review Comment:
   At this point, where we call the method, we know we do not have any explicit 
collations. This should imply we only have implicit or default collations? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [MINOR] Minor English fixes [spark]

2024-03-11 Thread via GitHub


nchammas opened a new pull request, #45461:
URL: https://github.com/apache/spark/pull/45461

   ### What changes were proposed in this pull request?
   
   Minor English grammar and wording fixes.
   
   ### Why are the changes needed?
   
   They're not strictly needed, but give the project a tiny bit more polish.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, some user-facing errors have been tweaked.
   
   ### How was this patch tested?
   
   No testing beyond CI.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519762760


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -138,21 +140,31 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   @scala.annotation.tailrec
   private def findWiderTypeForString(dt1: DataType, dt2: DataType): 
Option[DataType] = {
 (dt1, dt2) match {
-  case (StringType, _: IntegralType) => Some(LongType)
-  case (StringType, _: FractionalType) => Some(DoubleType)
-  case (StringType, NullType) => Some(StringType)
+  case (_: StringType, _: IntegralType) => Some(LongType)
+  case (_: StringType, _: FractionalType) => Some(DoubleType)
+  case (st: StringType, NullType) => Some(st)
   // If a binary operation contains interval type and string, we can't 
decide which
   // interval type the string should be promoted as. There are many 
possible interval
   // types, such as year interval, month interval, day interval, hour 
interval, etc.
-  case (StringType, _: AnsiIntervalType) => None
-  case (StringType, a: AtomicType) => Some(a)
-  case (other, StringType) if other != StringType => 
findWiderTypeForString(StringType, other)
+  case (_: StringType, _: AnsiIntervalType) => None
+  case (_: StringType, a: AtomicType) => Some(a)
+  case (other, st: StringType) if !other.isInstanceOf[StringType] =>
+findWiderTypeForString(st, other)
   case _ => None
 }
   }
 
-  override def findWiderCommonType(types: Seq[DataType]): Option[DataType] = {
-types.foldLeft[Option[DataType]](Some(NullType))((r, c) =>
+  override def findWiderCommonType(exprs: Seq[Expression],
+   failOnIndeterminate: Boolean = false): 
Option[DataType] = {
+(if (exprs.map(_.dataType).partition(hasStringType)._1.distinct.size > 1) {
+  val collationId = CollationTypeCasts.getOutputCollation(exprs, 
failOnIndeterminate)
+  exprs.map(e =>
+if (hasStringType(e.dataType)) {
+  castStringType(e.dataType, collationId)
+  e
+}
+else e)
+} else 
exprs).map(_.dataType).foldLeft[Option[DataType]](Some(NullType))((r, c) =>
   r match {
 case Some(d) => findWiderTypeForTwo(d, c)

Review Comment:
   `findWiderTypeForTwo` only accepts `dataType`'s. To extract if we have 
explicit collation, we need the expression itself. I will look into 
`findWiderTypeForTwo` and whether it makes sense to change it to accept 
expressions as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47210][SQL][COLLATION] Implicit casting on collated expressions [spark]

2024-03-11 Thread via GitHub


mihailom-db commented on code in PR #45383:
URL: https://github.com/apache/spark/pull/45383#discussion_r1519766425


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -173,6 +185,8 @@ object AnsiTypeCoercion extends TypeCoercionBase {
   inType: DataType,
   expectedType: AbstractDataType): Option[DataType] = {
 (inType, expectedType) match {
+  case (_: StringType, st: StringType) =>
+Some(st)

Review Comment:
   I am not sure I get this. We need to add the rule for casting any StringType 
class as acceptsType below would just lend us into keeping original 
collationId, and we would not be able to cast different collations into one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-45827] Fix for collation [spark]

2024-03-11 Thread via GitHub


cashmand opened a new pull request, #45463:
URL: https://github.com/apache/spark/pull/45463

   
   
   ### What changes were proposed in this pull request?
   
   https://github.com/apache/spark/pull/45409 created a default allow-list of 
types for data sources. The intent was to only prevent creation of the two 
types that had already been prevented elsewhere in code, but the match 
expression matched `StringType`, which is an object representing the default 
collation, instead of the `StringType` class, which represents any collation. 
This PR fixes the issue.
   
   ### Why are the changes needed?
   
   Without it, the previous PR would be a breaking change for data sources that 
write StringType with a non-default collation.
   
   ### Does this PR introduce _any_ user-facing change?
   
   It reverts the previous unintentional user-facing change.
   
   ### How was this patch tested?
   
   Unit test.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47343][SQL] Fix NPE when `sqlString` variable value is null string in execute immediate [spark]

2024-03-11 Thread via GitHub


milastdbx commented on code in PR #45462:
URL: https://github.com/apache/spark/pull/45462#discussion_r1519779866


##
common/utils/src/main/resources/error/error-classes.json:
##
@@ -3004,6 +3004,12 @@
 ],
 "sqlState" : "2200E"
   },
+  "NULL_QUERY_STRING_EXECUTE_IMMEDIATE" : {
+"message" : [
+  "SQLQuery string should not be null."

Review Comment:
   perhaps something like: `Execute immediate requires non-null value for 
sqlString variable, but provided  is null`



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala:
##
@@ -88,6 +88,10 @@ class SubstituteExecuteImmediate(val catalogManager: 
CatalogManager)
   throw 
QueryCompilationErrors.invalidExecuteImmediateVariableType(varReference.dataType)
 }
 
+if (varReference.eval(null) == null) {

Review Comment:
   consider storing `varReference` in variable so we don't evalulate multiple 
times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47313][SQL] Added scala.MatchError handling inside QueryExecution.toInternalError [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45438:
URL: https://github.com/apache/spark/pull/45438#issuecomment-1988551358

   +1, LGTM. Merging to master.
   Thank you, @stevomitric and @cloud-fan for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47313][SQL] Added scala.MatchError handling inside QueryExecution.toInternalError [spark]

2024-03-11 Thread via GitHub


MaxGekk closed pull request #45438: [SPARK-47313][SQL] Added scala.MatchError 
handling inside QueryExecution.toInternalError
URL: https://github.com/apache/spark/pull/45438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


EnricoMi opened a new pull request, #45464:
URL: https://github.com/apache/spark/pull/45464

   ### What changes were proposed in this pull request?
   Make Kubernetes resource manager support existing config 
`spark.ui.custom.executor.log.url`.
   
   Allow for
   
   
spark.ui.custom.executor.log.url="https://my.custom.url/logs?app={{APP_ID}}&executor={{EXECUTOR_ID}}";
   
   ### Why are the changes needed?
   Running Spark on Kubernetes requires persisting the logs elsewhere. Having 
the Spark UI link to those logs is very useful. This is currently only 
supported by YARN.
   
   ### Does this PR introduce _any_ user-facing change?
   Spark UI provides links to logs when run on Kubernetes.
   
   ### How was this patch tested?
   Unit test and manually tested on minikube K8S cluster.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47313][SQL] Added scala.MatchError handling inside QueryExecution.toInternalError [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45438:
URL: https://github.com/apache/spark/pull/45438#issuecomment-1988571851

   @stevomitric Congratulations with your first contribution to Apache Spark!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47255][SQL] Assign names to the error classes _LEGACY_ERROR_TEMP_323[6-7] and _LEGACY_ERROR_TEMP_324[7-9] [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45423:
URL: https://github.com/apache/spark/pull/45423#issuecomment-1988588006

   @miland-db Congratulations with your first contribution to Apache Spark!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47255][SQL] Assign names to the error classes _LEGACY_ERROR_TEMP_323[6-7] and _LEGACY_ERROR_TEMP_324[7-9] [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45423:
URL: https://github.com/apache/spark/pull/45423#issuecomment-1988584943

   +1, LGTM. Merging to master.
   Thank you, @miland-db and @HyukjinKwon for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47255][SQL] Assign names to the error classes _LEGACY_ERROR_TEMP_323[6-7] and _LEGACY_ERROR_TEMP_324[7-9] [spark]

2024-03-11 Thread via GitHub


MaxGekk closed pull request #45423: [SPARK-47255][SQL] Assign names to the 
error classes _LEGACY_ERROR_TEMP_323[6-7] and _LEGACY_ERROR_TEMP_324[7-9]
URL: https://github.com/apache/spark/pull/45423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


mridulm commented on PR #45464:
URL: https://github.com/apache/spark/pull/45464#issuecomment-1988591514

   +CC @thejdeep 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK 46840] Add sql.execution.benchmark.CollationBenchmark.scala Scaffolding [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on PR #45453:
URL: https://github.com/apache/spark/pull/45453#issuecomment-1988594133

   @dbatomic @stefankandic Could you review this PR, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


pan3793 commented on code in PR #45464:
URL: https://github.com/apache/spark/pull/45464#discussion_r1519858538


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala:
##
@@ -28,6 +28,46 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.Utils
 
+/**
+ * Custom implementation of CoarseGrainedExecutorBackend for Kubernetes 
resource manager.
+ * This class provides kubernetes executor attributes.
+ */
+private[spark] class KubernetesExecutorBackend(
+rpcEnv: RpcEnv,
+appId: String,
+driverUrl: String,
+executorId: String,
+bindAddress: String,
+hostname: String,
+podName: String,
+cores: Int,
+env: SparkEnv,
+resourcesFile: Option[String],
+resourceProfile: ResourceProfile)
+  extends CoarseGrainedExecutorBackend(
+rpcEnv,
+driverUrl,
+executorId,
+bindAddress,
+hostname,
+cores,
+env,
+resourcesFile,
+resourceProfile) with Logging {
+
+  override def extractAttributes: Map[String, String] = {
+super.extractAttributes ++
+  Map(
+"LOG_FILES" -> "log",
+"APP_ID" -> appId,
+"EXECUTOR_ID" -> executorId,
+"HOSTNAME" -> hostname,
+"POD_NAME" -> podName

Review Comment:
   the external log service for K8s is likely to use `namespace` and `pod name` 
to query the logs, could you please expose NAMESPACE too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings [spark]

2024-03-11 Thread via GitHub


ted-jenks commented on PR #45408:
URL: https://github.com/apache/spark/pull/45408#issuecomment-1988639946

   I am having trouble getting the failing test to pass:
   ```13:27:04.051 ERROR 
org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite```
   Giving:
   ```
   java.sql.SQLException
   [info]   org.apache.hive.service.cli.HiveSQLException: Error running query: 
[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] org.apache.spark.sql.AnalysisException: 
[WRONG_NUM_ARGS.WITHOUT_SUGGESTION] The `base64` requires 0 parameters but the 
actual number is 1. Please, refer to 
'https://spark.apache.org/docs/latest/sql-ref-functions.html' for a fix. 
SQLSTATE: 42605; line 1 pos 7
   [info]   at 
org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:43)
   [info]   at 
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation...
   ```
   Any idea why I would get this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


pan3793 commented on PR #45464:
URL: https://github.com/apache/spark/pull/45464#issuecomment-1988639547

   @EnricoMi this looks much simpler than my previous attempt 
https://github.com/apache/spark/pull/38357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


pan3793 commented on code in PR #45464:
URL: https://github.com/apache/spark/pull/45464#discussion_r1519873684


##
docs/configuration.md:
##
@@ -1627,15 +1627,13 @@ Apart from these, the following properties are also 
available, and may be useful
   spark.ui.custom.executor.log.url
   (none)
   
-Specifies custom spark executor log URL for supporting external log 
service instead of using cluster
+Specifies custom Spark executor log URL for supporting external log 
service instead of using cluster
 managers' application log URLs in Spark UI. Spark will support some path 
variables via patterns
 which can vary on cluster manager. Please check the documentation for your 
cluster manager to
 see which patterns are supported, if any. 
 Please note that this configuration also replaces original log urls in 
event log,
 which will be also effective when accessing the application on history 
server. The new log urls must be
 permanent, otherwise you might have dead link for executor log urls.
-
-For now, only YARN mode supports this configuration

Review Comment:
   standalone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator [spark]

2024-03-11 Thread via GitHub


sahnib commented on code in PR #45051:
URL: https://github.com/apache/spark/pull/45051#discussion_r1519943584


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -103,8 +116,12 @@ case class TransformWithStateExec(
 val keyObj = getKeyObj(keyRow)  // convert key to objects
 ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
 val valueObjIter = valueRowIter.map(getValueObj.apply)
-val mappedIterator = statefulProcessor.handleInputRows(keyObj, 
valueObjIter,
-  new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForLateEvents)).map { obj =>
+val mappedIterator = statefulProcessor.handleInputRows(
+  keyObj,
+  valueObjIter,
+  new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForLateEvents),

Review Comment:
   I discovered this as well and  updated it to `eventTimeForEviction` in my 
PR. 
https://github.com/apache/spark/pull/45376/files#diff-2bac1c42eb2edac75b4d725015d7a690269eb0869389e0347b8b6c01d222



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-47254][SQL] Assign names to the error classes _LEGACY_ERROR_TEMP_325[1-9] [spark]

2024-03-11 Thread via GitHub


MaxGekk commented on code in PR #45407:
URL: https://github.com/apache/spark/pull/45407#discussion_r1519942137


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkIntervalUtils.scala:
##
@@ -131,24 +131,21 @@ trait SparkIntervalUtils {
*/
   def stringToInterval(input: UTF8String): CalendarInterval = {
 import ParseState._
-def throwIAE(msg: String, e: Exception = null) = {
+if (input == null) {
   throw new SparkIllegalArgumentException(
-errorClass = "_LEGACY_ERROR_TEMP_3255",
+errorClass = "INVALID_INTERVAL_FORMAT.INPUT_IS_NULL",
 messageParameters = Map(
-  "input" -> Option(input).map(_.toString).getOrElse("null"),
-  "msg" -> msg),
-cause = e)
-}
-
-if (input == null) {
-  throwIAE("interval string cannot be null")
+  "input" -> Option(input).map(_.toString).getOrElse("null")))

Review Comment:
   `input` is null for sure here, just place `"null"`



##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkIntervalUtils.scala:
##
@@ -131,24 +131,21 @@ trait SparkIntervalUtils {
*/
   def stringToInterval(input: UTF8String): CalendarInterval = {
 import ParseState._
-def throwIAE(msg: String, e: Exception = null) = {
+if (input == null) {
   throw new SparkIllegalArgumentException(
-errorClass = "_LEGACY_ERROR_TEMP_3255",
+errorClass = "INVALID_INTERVAL_FORMAT.INPUT_IS_NULL",
 messageParameters = Map(
-  "input" -> Option(input).map(_.toString).getOrElse("null"),
-  "msg" -> msg),
-cause = e)
-}
-
-if (input == null) {
-  throwIAE("interval string cannot be null")
+  "input" -> Option(input).map(_.toString).getOrElse("null")))
 }
 // scalastyle:off caselocale .toLowerCase
 val s = input.trimAll().toLowerCase
 // scalastyle:on
 val bytes = s.getBytes
 if (bytes.isEmpty) {
-  throwIAE("interval string cannot be empty")
+  throw new SparkIllegalArgumentException(
+errorClass = "INVALID_INTERVAL_FORMAT.INPUT_IS_EMPTY",
+messageParameters = Map(
+  "input" -> Option(input).map(_.toString).getOrElse("null")))

Review Comment:
   Can it be `null` here? Seems not.



##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkIntervalUtils.scala:
##
@@ -183,9 +180,16 @@ trait SparkIntervalUtils {
 case PREFIX =>
   if (s.startsWith(intervalStr)) {
 if (s.numBytes() == intervalStr.numBytes()) {
-  throwIAE("interval string cannot be empty")
+  throw new SparkIllegalArgumentException(
+errorClass = "INVALID_INTERVAL_FORMAT.INPUT_IS_EMPTY",
+messageParameters = Map(
+  "input" -> Option(input).map(_.toString).getOrElse("null")))

Review Comment:
   ditto: can it be `null` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47255][SQL] Assign names to the error classes _LEGACY_ERROR_TEMP_323[6-7] and _LEGACY_ERROR_TEMP_324[7-9] [spark]

2024-03-11 Thread via GitHub


miland-db commented on PR #45423:
URL: https://github.com/apache/spark/pull/45423#issuecomment-1988766106

   Thank you! @MaxGekk and thank you @HyukjinKwon for the comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47295][SQL][COLLATION] Added ICU StringSearch for 'startsWith' and 'endsWith' functions [spark]

2024-03-11 Thread via GitHub


stevomitric commented on code in PR #45421:
URL: https://github.com/apache/spark/pull/45421#discussion_r1519971782


##
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java:
##
@@ -378,13 +378,6 @@ public boolean matchAt(final UTF8String s, int pos) {
 return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, 
s.numBytes);
   }
 
-  private boolean matchAt(final UTF8String s, int pos, int collationId) {
-if (s.numBytes + pos > numBytes || pos < 0) {
-  return false;
-}
-return this.substring(pos, pos + s.numBytes).semanticCompare(s, 
collationId) == 0;

Review Comment:
   Migrated `collatedStartsWith` and `collatedEndsWith` functions to here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK 46840] Add sql.execution.benchmark.CollationBenchmark.scala Scaffolding [spark]

2024-03-11 Thread via GitHub


dbatomic commented on PR #45453:
URL: https://github.com/apache/spark/pull/45453#issuecomment-1988798774

   @GideonPotok - I think that better approach for benchmarking collation track 
is to start with the basics. e.g. unit benchmarks against `CollationFactory` 
+`UTF8String`. E.g. what is the perf diff between simple filter, without the 
rest of the spark stack, between UTF8_BINARY, UTF8_BINARY_LCASE, UNICODE and 
UNICODE_CI. After filter we can do the same for hashFunction. You should be 
able to just generate bunch of UTF8Stings and guide them through 
`comparator`/`hashFunction` of `Collations` in `CollationFactory`.
   
   That way benchmarking will be actionable. Starting immediately with joins is 
too high up and I think that we will not be able to do much with the results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


EnricoMi commented on code in PR #45464:
URL: https://github.com/apache/spark/pull/45464#discussion_r1519987571


##
docs/configuration.md:
##
@@ -1627,15 +1627,13 @@ Apart from these, the following properties are also 
available, and may be useful
   spark.ui.custom.executor.log.url
   (none)
   
-Specifies custom spark executor log URL for supporting external log 
service instead of using cluster
+Specifies custom Spark executor log URL for supporting external log 
service instead of using cluster
 managers' application log URLs in Spark UI. Spark will support some path 
variables via patterns
 which can vary on cluster manager. Please check the documentation for your 
cluster manager to
 see which patterns are supported, if any. 
 Please note that this configuration also replaces original log urls in 
event log,
 which will be also effective when accessing the application on history 
server. The new log urls must be
 permanent, otherwise you might have dead link for executor log urls.
-
-For now, only YARN mode supports this configuration

Review Comment:
   I think the `Please check the documentation for your cluster manager to see 
which patterns are supported, if any.` is sufficient, there is no need to list 
which manager supports this conf and which don't. That list easily gets 
out-dated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

2024-03-11 Thread via GitHub


peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520049076


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with 
SharedSparkSession {
   assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
 }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") 
{
+val df = Seq((1, 2)).toDF("a", "b")
+val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+checkAnswer(
+  df3,
+  Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute 
aliased") {

Review Comment:
   Can you please use `:` instead of `.` in test names? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


EnricoMi commented on code in PR #45464:
URL: https://github.com/apache/spark/pull/45464#discussion_r1520050341


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala:
##
@@ -28,6 +28,46 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.Utils
 
+/**
+ * Custom implementation of CoarseGrainedExecutorBackend for Kubernetes 
resource manager.
+ * This class provides kubernetes executor attributes.
+ */
+private[spark] class KubernetesExecutorBackend(
+rpcEnv: RpcEnv,
+appId: String,
+driverUrl: String,
+executorId: String,
+bindAddress: String,
+hostname: String,
+podName: String,
+cores: Int,
+env: SparkEnv,
+resourcesFile: Option[String],
+resourceProfile: ResourceProfile)
+  extends CoarseGrainedExecutorBackend(
+rpcEnv,
+driverUrl,
+executorId,
+bindAddress,
+hostname,
+cores,
+env,
+resourcesFile,
+resourceProfile) with Logging {
+
+  override def extractAttributes: Map[String, String] = {
+super.extractAttributes ++
+  Map(
+"LOG_FILES" -> "log",
+"APP_ID" -> appId,
+"EXECUTOR_ID" -> executorId,
+"HOSTNAME" -> hostname,
+"POD_NAME" -> podName

Review Comment:
   I have added the namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47323][K8S] Support custom executor log urls [spark]

2024-03-11 Thread via GitHub


EnricoMi commented on PR #45464:
URL: https://github.com/apache/spark/pull/45464#issuecomment-1988911223

   > @EnricoMi this looks much simpler than my previous attempt #38357
   
   Thanks for the pointer! I have a PR for driver log support in the pipeline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

2024-03-11 Thread via GitHub


peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520078411


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with 
SharedSparkSession {
   assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
 }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") 
{
+val df = Seq((1, 2)).toDF("a", "b")
+val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+checkAnswer(
+  df3,
+  Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === 
df2("aa")).select(df1("a").as("aaa"),
+  df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join1 = proj1.child.asInstanceOf[Join]
+assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join2 = proj2.child.asInstanceOf[Join]
+assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), 
df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === 
df1("a")).select(df1Joindf2("a"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   Shouldn't selecting `df1("a")` be ambiguous here? It is not ambiguous in the 
join condition because `df1Joindf2("a")` can come from only one side so the 
`df1("a")` must come from the other side. But after the join I'm not sure we 
can follow the same logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

2024-03-11 Thread via GitHub


peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520078411


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with 
SharedSparkSession {
   assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
 }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") 
{
+val df = Seq((1, 2)).toDF("a", "b")
+val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+checkAnswer(
+  df3,
+  Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === 
df2("aa")).select(df1("a").as("aaa"),
+  df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join1 = proj1.child.asInstanceOf[Join]
+assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join2 = proj2.child.asInstanceOf[Join]
+assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), 
df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === 
df1("a")).select(df1Joindf2("a"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   Shouldn't selecting `df1("a")` be ambiguous here? It is not ambiguous in the 
join condition because `df1Joindf2("a")` can come from only one side so the 
`df1("a")` must come from the other side. But after the join I'm not sure why 
it shouldn't be ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

2024-03-11 Thread via GitHub


peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520115101


##
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with 
SharedSparkSession {
   assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
 }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") 
{
+val df = Seq((1, 2)).toDF("a", "b")
+val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+checkAnswer(
+  df3,
+  Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === 
df2("aa")).select(df1("a").as("aaa"),
+  df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join1 = proj1.child.asInstanceOf[Join]
+assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === 
df1("a")).select(df1Joindf2("aa"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+val join2 = proj2.child.asInstanceOf[Join]
+assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute 
aliased") {
+val df1 = Seq((1, 2)).toDF("a", "b")
+val df2 = Seq((1, 2)).toDF("aa", "bb")
+val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), 
df2("aa"), df1("b"))
+
+assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+  Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === 
df1("a")).select(df1Joindf2("a"),
+  df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   cc @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SQL] Bind JDBC dialect to JDBCRDD at construction [spark]

2024-03-11 Thread via GitHub


johnnywalker commented on code in PR #45410:
URL: https://github.com/apache/spark/pull/45410#discussion_r1520132359


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala:
##
@@ -153,12 +153,12 @@ object JDBCRDD extends Logging {
  */
 class JDBCRDD(

Review Comment:
   Thanks @utkarshwealthy for the proposal. Would something like this be 
acceptable instead?
   
   ```scala
   diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
   index 7eff4bd376b..024e7327de7 100644
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
   @@ -142,6 +142,7 @@ object JDBCRDD extends Logging {
  limit,
  sortOrders,
  offset)
   +  .withDialect(dialect)
  }
  // scalastyle:on argcount
}
   @@ -174,7 +175,21 @@ class JDBCRDD(
sparkContext,
name = "JDBC query execution time")

   -  private lazy val dialect = JdbcDialects.get(url)
   +  /**
   +   * Dialect to use instead of inferring it from the URL.
   +   */
   +  private var prescribedDialect: Option[JdbcDialect] = None
   +
   +  private lazy val dialect = 
prescribedDialect.getOrElse(JdbcDialects.get(url))
   +
   +  /**
   +   * Prescribe a particular dialect to use for this RDD. If not set, the 
dialect will be automatically
   +   * resolved from the JDBC URL. This previous behavior is preserved for 
binary compatibility.
   +   */
   +  def withDialect(dialect: JdbcDialect): JDBCRDD = {
   +prescribedDialect = Some(dialect)
   +this
   +  }

  def generateJdbcQuery(partition: Option[JDBCPartition]): String = {
// H2's JDBC driver does not support the setSchema() method.  We pass a
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >