[GitHub] [spark] wangyum opened a new pull request, #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode

2023-03-31 Thread via GitHub


wangyum opened a new pull request, #40616:
URL: https://github.com/apache/spark/pull/40616

   ### What changes were proposed in this pull request?
   
   1. Moved `ResolveBinaryArithmetic` from `Analyzer` to `TypeCoercion` and 
`AnsiTypeCoercion`.
   2. Update `ResolveBinaryArithmetic` in `AnsiTypeCoercion` to only support 
datetime types +/- intervals.
   
   ### Why are the changes needed?
   
   Avoid potential data issues.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Disable string +/- interval in ANSI mode.
   
   ### How was this patch tested?
   
   Update existing test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic opened a new pull request, #40617: [SPARK-42992][PYTHON] Introduce PySparkRuntimeError

2023-03-31 Thread via GitHub


itholic opened a new pull request, #40617:
URL: https://github.com/apache/spark/pull/40617

   ### What changes were proposed in this pull request?
   
   This PR proposes to introduce new error for PySpark.
   
   
   ### Why are the changes needed?
   
   To cover the built-in RuntimeError by PySpark error framework.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, it's internal error framework improvement.
   
   
   ### How was this patch tested?
   
   The existing CI should pass.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode

2023-03-31 Thread via GitHub


wangyum commented on PR #40616:
URL: https://github.com/apache/spark/pull/40616#issuecomment-1491411368

   cc @cloud-fan @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thyecust opened a new pull request, #40618: fix typo in StorageLevel __eq__()

2023-03-31 Thread via GitHub


thyecust opened a new pull request, #40618:
URL: https://github.com/apache/spark/pull/40618

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition

2023-03-31 Thread via GitHub


cloud-fan commented on code in PR #40523:
URL: https://github.com/apache/spark/pull/40523#discussion_r1154099624


##
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala:
##
@@ -1036,8 +1036,17 @@ case class SortMergeJoinExec(
 val rightResultVars = genOneSideJoinVars(
   ctx, rightOutputRow, right, setDefaultValue = true)
 val resultVars = leftResultVars ++ rightResultVars
-val (_, conditionCheck, _) =
-  getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
+// Evaluate the variables on the left and used in the condition but do not 
clear the code as

Review Comment:
   This is very hard to review. Is there any similar code in other join types 
or `ShuffledHashJoinExec`? It seems other places are using 
`splitVarsByCondition` and `evaluateVariables`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thyecust closed pull request #40618: fix typo in StorageLevel __eq__()

2023-03-31 Thread via GitHub


thyecust closed pull request #40618: fix typo in StorageLevel __eq__()
URL: https://github.com/apache/spark/pull/40618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thyecust opened a new pull request, #40619: fix typo in StorageLevel __eq__()

2023-03-31 Thread via GitHub


thyecust opened a new pull request, #40619:
URL: https://github.com/apache/spark/pull/40619

   
   
   ### What changes were proposed in this pull request?
   
   
   fix `self.deserialized == self.deserialized` with `self.deserialized == 
other.deserialized`
   
   ### Why are the changes needed?
   
   
   The original expression is always True, which is likely to be a typo.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No.
   
   ### How was this patch tested?
   
   
   No test added. Use GitHub Actions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thyecust opened a new pull request, #40620: fix typo in pyspark/pandas/config.py

2023-03-31 Thread via GitHub


thyecust opened a new pull request, #40620:
URL: https://github.com/apache/spark/pull/40620

   By comparing compute.isin_limit and plotting.max_rows, `v is v` is likely to 
be a typo.
   
   
   
   ### What changes were proposed in this pull request?
   
   fix `v is v >= 0` with `v >= 0`.
   
   ### Why are the changes needed?
   
   By comparing compute.isin_limit and plotting.max_rows, `v is v` is likely to 
be a typo.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   By GitHub Actions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] liangyu-1 opened a new pull request, #40621: Fix ExecutorAllocationManager cannot allocate new instances when all …

2023-03-31 Thread via GitHub


liangyu-1 opened a new pull request, #40621:
URL: https://github.com/apache/spark/pull/40621

   ### What changes were proposed in this pull request?
   the PR fix ExecutorAllocationManager cannot allocate new instances when all 
executors down.
   
   
   
   ### Why are the changes needed?
   
   As described in the issue 
[SPARK-42972](https://issues.apache.org/jira/browse/SPARK-42972)
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   
   
   I test this in our cluster and the spark streaming app runs normally.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode

2023-03-31 Thread via GitHub


cloud-fan commented on code in PR #40616:
URL: https://github.com/apache/spark/pull/40616#discussion_r1154114177


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala:
##
@@ -288,6 +290,82 @@ object AnsiTypeCoercion extends TypeCoercionBase {
 }
   }
 
+  // Please see the comments in `TypeCoercion.ResolveBinaryArithmetic`.
+  object ResolveBinaryArithmetic extends TypeCoercionRule {

Review Comment:
   is it possible to do some code sharing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thyecust opened a new pull request, #40622: fix typo in ResourceRequest.equals()

2023-03-31 Thread via GitHub


thyecust opened a new pull request, #40622:
URL: https://github.com/apache/spark/pull/40622

   vendor == vendor is always true, this is likely to be a typo.
   
   
   
   ### What changes were proposed in this pull request?
   
   fix `vendor == vendor` with `that.vendor == vendor`, and `discoveryScript == 
discoveryScript` with `that.discoveryScript == discoveryScript`
   
   ### Why are the changes needed?
   
   vendor == vendor is always true, this is likely to be a typo.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   By GitHub Actions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] liangyu-1 commented on pull request #40621: Fix ExecutorAllocationManager cannot allocate new instances when all …

2023-03-31 Thread via GitHub


liangyu-1 commented on PR #40621:
URL: https://github.com/apache/spark/pull/40621#issuecomment-1491457693

   @Ngone51 @VasilyKolpakov 
   please help me review the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn commented on pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name with a schema-Name

2023-03-31 Thread via GitHub


yaooqinn commented on PR #40602:
URL: https://github.com/apache/spark/pull/40602#issuecomment-1491489837

   thanks, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn closed pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name with a schema-Name

2023-03-31 Thread via GitHub


yaooqinn closed pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot 
qualify a new-table-Name with a schema-Name
URL: https://github.com/apache/spark/pull/40602


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40622: fix typo in ResourceRequest.equals()

2023-03-31 Thread via GitHub


HyukjinKwon commented on PR #40622:
URL: https://github.com/apache/spark/pull/40622#issuecomment-1491509278

   Hey, I think all these typo PRs should have a test if this is an actual 
issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40622: fix typo in ResourceRequest.equals()

2023-03-31 Thread via GitHub


HyukjinKwon commented on PR #40622:
URL: https://github.com/apache/spark/pull/40622#issuecomment-1491509670

   And please file a JIRA if this is an issue, see also 
https://spark.apache.org/contributing.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


zhengruifeng commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154257731


##
dev/sparktestsupport/modules.py:
##
@@ -693,6 +693,56 @@ def __hash__(self):
 "pyspark.pandas.tests.test_typedef",
 "pyspark.pandas.tests.test_utils",
 "pyspark.pandas.tests.test_window",
+# unittests - Spark Connect parity tests

Review Comment:
   I think we should move the tests into `pyspark-connect`, and then make 
`pyspark-connect` also depends on `pyspark_core`



##
python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py:
##
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.data_type_ops.test_binary_ops import 
BinaryOpsTestsMixin
+from pyspark.pandas.tests.connect.data_type_ops.testing_utils import 
OpsTestBase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class BinaryOpsParityTests(
+BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, 
ReusedConnectTestCase
+):
+@unittest.skip("Fails in Spark Connect, should enable.")

Review Comment:
   I think we'd better document the failure reason:
   
   - some are need be enabled
   - some cann't, may due to private method `_jvm` in the test
   - etc
   
   but this can be done in followups



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40611:
URL: https://github.com/apache/spark/pull/40611#discussion_r1154265174


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala:
##
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.arrow
+
+import java.io.{ByteArrayOutputStream, OutputStream}
+import java.lang.invoke.{MethodHandles, MethodType}
+import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger}
+import java.nio.channels.Channels
+import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, 
DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, 
IntervalYearVector, IntVector, NullVector, SmallIntVector, 
TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, 
VarCharVector, VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.util.Text
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Helper class for converting user objects into arrow batches.
+ */
+class ArrowSerializer[T](
+private[this] val enc: AgnosticEncoder[T],
+private[this] val allocator: BufferAllocator,
+private[this] val timeZoneId: String) {
+  private val (root, serializer) = ArrowSerializer.serializerFor(enc, 
allocator, timeZoneId)
+  private val vectors = root.getFieldVectors.asScala
+  private val unloader = new VectorUnloader(root)
+  private val schemaBytes = {
+// Only serialize the schema once.
+val bytes = new ByteArrayOutputStream()
+MessageSerializer.serialize(newChannel(bytes), root.getSchema)
+bytes.toByteArray
+  }
+  private var i: Int = 0
+
+  private def newChannel(output: OutputStream): WriteChannel = {
+new WriteChannel(Channels.newChannel(output))
+  }
+
+  /**
+   * The size of the current batch.
+   *
+   * The size computed consist of the size of the schema and the size of the 
arrow buffers. The
+   * actual batch will be larger than that because of alignment, written IPC 
tokens, and the
+   * written record batch metadata. The size of the record batch metadata is 
proportional to the
+   * complexity of the schema.
+   */
+  def sizeInBytes: Long = {
+// We need to set the row count for getBufferSize to return the actual 
value.
+root.setRowCount(i)
+schemaBytes.length + vectors.map(_.getBufferSize).sum
+  }
+
+  /**
+   * Append a record to the current batch.
+   */
+  def append(record: T): Unit = {
+serializer.write(i, record)
+i += 1
+  }
+
+  /**
+   * Write the schema and the current batch in Arrow IPC stream format to the 
[[OutputStream]].
+   */
+  def writeIpcStream(output: OutputStream): Unit = {
+val channel = newChannel(output)
+root.setRowCount(i)
+val batch = unloader.getRecordBatch
+try {
+  channel.write(schemaBytes)
+  MessageSerializer.serialize(channel, batch)
+  ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+} finally {
+  batch.close()
+}
+  }
+
+  /**
+   * Reset the serializer.
+   */
+  def reset(): Unit = {
+i = 0
+vectors.foreach(_.reset())
+  }
+
+  /**
+   * Close the serializer.
+   */
+  def close(): Unit = root.close()
+}
+
+object ArrowSerializer {
+  import ArrowEncoderUtils._
+
+  /**
+   * Create an [[Iterator]] that

[GitHub] [spark] MaxGekk opened a new pull request, #40623: [WIP][SQL] Parameterized `sql()` with literal args

2023-03-31 Thread via GitHub


MaxGekk opened a new pull request, #40623:
URL: https://github.com/apache/spark/pull/40623

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40611:
URL: https://github.com/apache/spark/pull/40611#discussion_r1154282542


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala:
##
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.arrow
+
+import java.io.{ByteArrayOutputStream, OutputStream}
+import java.lang.invoke.{MethodHandles, MethodType}
+import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger}
+import java.nio.channels.Channels
+import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, 
DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, 
IntervalYearVector, IntVector, NullVector, SmallIntVector, 
TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, 
VarCharVector, VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.util.Text
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Helper class for converting user objects into arrow batches.
+ */
+class ArrowSerializer[T](
+private[this] val enc: AgnosticEncoder[T],
+private[this] val allocator: BufferAllocator,
+private[this] val timeZoneId: String) {
+  private val (root, serializer) = ArrowSerializer.serializerFor(enc, 
allocator, timeZoneId)
+  private val vectors = root.getFieldVectors.asScala
+  private val unloader = new VectorUnloader(root)
+  private val schemaBytes = {
+// Only serialize the schema once.
+val bytes = new ByteArrayOutputStream()
+MessageSerializer.serialize(newChannel(bytes), root.getSchema)
+bytes.toByteArray
+  }
+  private var i: Int = 0
+
+  private def newChannel(output: OutputStream): WriteChannel = {
+new WriteChannel(Channels.newChannel(output))
+  }
+
+  /**
+   * The size of the current batch.
+   *
+   * The size computed consist of the size of the schema and the size of the 
arrow buffers. The
+   * actual batch will be larger than that because of alignment, written IPC 
tokens, and the
+   * written record batch metadata. The size of the record batch metadata is 
proportional to the
+   * complexity of the schema.
+   */
+  def sizeInBytes: Long = {
+// We need to set the row count for getBufferSize to return the actual 
value.
+root.setRowCount(i)
+schemaBytes.length + vectors.map(_.getBufferSize).sum
+  }
+
+  /**
+   * Append a record to the current batch.
+   */
+  def append(record: T): Unit = {
+serializer.write(i, record)
+i += 1
+  }
+
+  /**
+   * Write the schema and the current batch in Arrow IPC stream format to the 
[[OutputStream]].
+   */
+  def writeIpcStream(output: OutputStream): Unit = {
+val channel = newChannel(output)
+root.setRowCount(i)
+val batch = unloader.getRecordBatch
+try {
+  channel.write(schemaBytes)
+  MessageSerializer.serialize(channel, batch)
+  ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+} finally {
+  batch.close()
+}
+  }
+
+  /**
+   * Reset the serializer.
+   */
+  def reset(): Unit = {
+i = 0
+vectors.foreach(_.reset())
+  }
+
+  /**
+   * Close the serializer.
+   */
+  def close(): Unit = root.close()
+}
+
+object ArrowSerializer {
+  import ArrowEncoderUtils._
+
+  /**
+   * Create an [[Iterator]] that

[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40611:
URL: https://github.com/apache/spark/pull/40611#discussion_r1154283195


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala:
##
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.arrow
+
+import java.io.{ByteArrayOutputStream, OutputStream}
+import java.lang.invoke.{MethodHandles, MethodType}
+import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger}
+import java.nio.channels.Channels
+import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, 
DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, 
IntervalYearVector, IntVector, NullVector, SmallIntVector, 
TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, 
VarCharVector, VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.util.Text
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Helper class for converting user objects into arrow batches.
+ */
+class ArrowSerializer[T](
+private[this] val enc: AgnosticEncoder[T],
+private[this] val allocator: BufferAllocator,
+private[this] val timeZoneId: String) {
+  private val (root, serializer) = ArrowSerializer.serializerFor(enc, 
allocator, timeZoneId)
+  private val vectors = root.getFieldVectors.asScala
+  private val unloader = new VectorUnloader(root)
+  private val schemaBytes = {
+// Only serialize the schema once.
+val bytes = new ByteArrayOutputStream()
+MessageSerializer.serialize(newChannel(bytes), root.getSchema)
+bytes.toByteArray
+  }
+  private var i: Int = 0
+
+  private def newChannel(output: OutputStream): WriteChannel = {
+new WriteChannel(Channels.newChannel(output))
+  }
+
+  /**
+   * The size of the current batch.
+   *
+   * The size computed consist of the size of the schema and the size of the 
arrow buffers. The
+   * actual batch will be larger than that because of alignment, written IPC 
tokens, and the
+   * written record batch metadata. The size of the record batch metadata is 
proportional to the
+   * complexity of the schema.
+   */
+  def sizeInBytes: Long = {
+// We need to set the row count for getBufferSize to return the actual 
value.
+root.setRowCount(i)
+schemaBytes.length + vectors.map(_.getBufferSize).sum
+  }
+
+  /**
+   * Append a record to the current batch.
+   */
+  def append(record: T): Unit = {
+serializer.write(i, record)
+i += 1
+  }
+
+  /**
+   * Write the schema and the current batch in Arrow IPC stream format to the 
[[OutputStream]].
+   */
+  def writeIpcStream(output: OutputStream): Unit = {
+val channel = newChannel(output)
+root.setRowCount(i)
+val batch = unloader.getRecordBatch
+try {
+  channel.write(schemaBytes)
+  MessageSerializer.serialize(channel, batch)
+  ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+} finally {
+  batch.close()
+}
+  }
+
+  /**
+   * Reset the serializer.
+   */
+  def reset(): Unit = {
+i = 0
+vectors.foreach(_.reset())
+  }
+
+  /**
+   * Close the serializer.
+   */
+  def close(): Unit = root.close()
+}
+
+object ArrowSerializer {
+  import ArrowEncoderUtils._
+
+  /**
+   * Create an [[Iterator]] that

[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40611:
URL: https://github.com/apache/spark/pull/40611#discussion_r1154304836


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala:
##
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.arrow
+
+import java.io.{ByteArrayOutputStream, OutputStream}
+import java.lang.invoke.{MethodHandles, MethodType}
+import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger}
+import java.nio.channels.Channels
+import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, 
DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, 
IntervalYearVector, IntVector, NullVector, SmallIntVector, 
TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, 
VarCharVector, VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector}
+import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
+import org.apache.arrow.vector.util.Text
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.DefinedByConstructorParams
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.sql.util.ArrowUtils
+
+/**
+ * Helper class for converting user objects into arrow batches.
+ */
+class ArrowSerializer[T](
+private[this] val enc: AgnosticEncoder[T],
+private[this] val allocator: BufferAllocator,
+private[this] val timeZoneId: String) {
+  private val (root, serializer) = ArrowSerializer.serializerFor(enc, 
allocator, timeZoneId)
+  private val vectors = root.getFieldVectors.asScala
+  private val unloader = new VectorUnloader(root)
+  private val schemaBytes = {
+// Only serialize the schema once.
+val bytes = new ByteArrayOutputStream()
+MessageSerializer.serialize(newChannel(bytes), root.getSchema)
+bytes.toByteArray
+  }
+  private var i: Int = 0
+
+  private def newChannel(output: OutputStream): WriteChannel = {
+new WriteChannel(Channels.newChannel(output))
+  }
+
+  /**
+   * The size of the current batch.
+   *
+   * The size computed consist of the size of the schema and the size of the 
arrow buffers. The
+   * actual batch will be larger than that because of alignment, written IPC 
tokens, and the
+   * written record batch metadata. The size of the record batch metadata is 
proportional to the
+   * complexity of the schema.
+   */
+  def sizeInBytes: Long = {
+// We need to set the row count for getBufferSize to return the actual 
value.
+root.setRowCount(i)
+schemaBytes.length + vectors.map(_.getBufferSize).sum
+  }
+
+  /**
+   * Append a record to the current batch.
+   */
+  def append(record: T): Unit = {
+serializer.write(i, record)
+i += 1
+  }
+
+  /**
+   * Write the schema and the current batch in Arrow IPC stream format to the 
[[OutputStream]].
+   */
+  def writeIpcStream(output: OutputStream): Unit = {
+val channel = newChannel(output)
+root.setRowCount(i)
+val batch = unloader.getRecordBatch
+try {
+  channel.write(schemaBytes)
+  MessageSerializer.serialize(channel, batch)
+  ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)
+} finally {
+  batch.close()
+}
+  }
+
+  /**
+   * Reset the serializer.
+   */
+  def reset(): Unit = {
+i = 0
+vectors.foreach(_.reset())
+  }
+
+  /**
+   * Close the serializer.
+   */
+  def close(): Unit = root.close()
+}
+
+object ArrowSerializer {
+  import ArrowEncoderUtils._
+
+  /**
+   * Create an [[Iterator]] that

[GitHub] [spark] itholic opened a new pull request, #40624: [SPARK-42995][CONNECT][PYTHON] Migrate Spark Connect DataFrame errors into error class

2023-03-31 Thread via GitHub


itholic opened a new pull request, #40624:
URL: https://github.com/apache/spark/pull/40624

   ### What changes were proposed in this pull request?
   
   This PR proposes to migrate remaining `TypeError` from 
`python/pyspark/sql/connect/dataframe.py` into `PySparkTypeError`.
   
   
   ### Why are the changes needed?
   
   To migrate all user-facing errors into PySpark error framework.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No, it's internal error framework improvement.
   
   
   ### How was this patch tested?
   
   
   The existing CI should pass.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


itholic commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154321016


##
dev/sparktestsupport/modules.py:
##
@@ -693,6 +693,56 @@ def __hash__(self):
 "pyspark.pandas.tests.test_typedef",
 "pyspark.pandas.tests.test_utils",
 "pyspark.pandas.tests.test_window",
+# unittests - Spark Connect parity tests

Review Comment:
   That makes pretty much sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhmin commented on a diff in pull request #40567: [SPARK-42935] [SQL] Add union required distribution push down

2023-03-31 Thread via GitHub


zhmin commented on code in PR #40567:
URL: https://github.com/apache/spark/pull/40567#discussion_r1154322170


##
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/UnionDistributionPushdown.scala:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{OneToOneDependency, Partition, SparkContext, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnionExec, UnionZipExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * Pushes parent's required distribution to [[UnionExec]] so that we can
+ * remove unnecessary shuffle exchange if any child output partitioning matches
+ * 1) the operator is UnionExec
+ * 2) the operator's output partitioning can not satisfy parent distribution
+ * 3) the operator's parent requires hashing or clustering distribution
+ * 4) the data types of operator's children output attributes are the same
+ */
+case class UnionDistributionPushdown() extends Rule[SparkPlan] {
+
+  def allChildrenSameDataType(unionExec: UnionExec): Boolean = {
+(Seq(unionExec) ++ unionExec.children).map(_.output).transpose.map(attrs 
=> {
+  val firstAttr = attrs.head
+  attrs.forall(_.dataType == firstAttr.dataType)
+}).forall(_ == true)
+  }
+
+  def createNewChildrenDistribution(unionExec: UnionExec,
+unionRequired: Distribution): 
Seq[Distribution] = {
+unionExec.children.map(unionChild => {
+  // create output attributes map (union output exprId -> child output)
+  val attributes = 
unionExec.output.map(_.exprId).zip(unionChild.output).toMap
+  unionRequired match {
+case distribution: ClusteredDistribution =>
+  val newExpressions = distribution.clustering.map(exp => 
exp.transformUp {
+case attr: Attribute if attributes.contains(attr.exprId) => 
attributes(attr.exprId)
+  })
+  ClusteredDistribution(newExpressions, 
distribution.requireAllClusterKeys,
+distribution.requiredNumPartitions)
+
+case distribution => distribution
+  }
+})
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+case operator: SparkPlan if 
conf.getConf(SQLConf.UNION_REQUIRED_DISTRIBUTION_PUSHDOWN) &&
+  operator.children.exists(_.isInstanceOf[UnionExec]) =>
+  val newChildren = operator.children.zipWithIndex.map {
+case (unionExec: UnionExec, childIndex: Int) if 
!unionExec.outputPartitioning.
+  satisfies(operator.requiredChildDistribution(childIndex)) =>
+  val unionRequired = operator.requiredChildDistribution(childIndex)
+  unionRequired match {
+case _: ClusteredDistribution if 
allChildrenSameDataType(unionExec) =>
+  val requiredChildDistribution: Seq[Distribution] =
+createNewChildrenDistribution(unionExec, unionRequired)
+  val numPartitions = unionRequired.requiredNumPartitions
+.getOrElse(conf.numShufflePartitions)
+  val outputPartitioning = 
unionRequired.createPartitioning(numPartitions)
+  UnionZipExec(unionExec.children, requiredChildDistribution,
+outputPartitioning)
+
+case _: Distribution => unionExec
+  }
+
+case (child, _) => child
+  }
+  operator.withNewChildren(newChildren)
+  }
+}
+
+/**
+ * Class representing partitions of UnionZipRDD, which maintains the list of
+ * corresponding partitions of parent RDDs.
+ */
+private[spark]
+class UnionZipRDDPartition(@transient val rdds: Seq[RDD[_]],
+   override val index: Int) extends Partition {
+  var parents: Array[Partition] = rdds.map(_.partitions(index)).toArray
+
+  override def hashCode(): Int = index
+
+  override def equals(other: Any):

[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


itholic commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154322879


##
python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py:
##
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.data_type_ops.test_binary_ops import 
BinaryOpsTestsMixin
+from pyspark.pandas.tests.connect.data_type_ops.testing_utils import 
OpsTestBase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class BinaryOpsParityTests(
+BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, 
ReusedConnectTestCase
+):
+@unittest.skip("Fails in Spark Connect, should enable.")

Review Comment:
   Good points. I'm making a sort of list for the failure for each tests.
   Let me do it in followups to avoid adding any further complexity to this PR. 
😂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhmin commented on a diff in pull request #40567: [SPARK-42935] [SQL] Add union required distribution push down

2023-03-31 Thread via GitHub


zhmin commented on code in PR #40567:
URL: https://github.com/apache/spark/pull/40567#discussion_r1154322505


##
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/UnionDistributionPushdown.scala:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import java.io.{IOException, ObjectOutputStream}
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{OneToOneDependency, Partition, SparkContext, 
TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnionExec, UnionZipExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * Pushes parent's required distribution to [[UnionExec]] so that we can
+ * remove unnecessary shuffle exchange if any child output partitioning matches
+ * 1) the operator is UnionExec
+ * 2) the operator's output partitioning can not satisfy parent distribution
+ * 3) the operator's parent requires hashing or clustering distribution
+ * 4) the data types of operator's children output attributes are the same
+ */
+case class UnionDistributionPushdown() extends Rule[SparkPlan] {
+
+  def allChildrenSameDataType(unionExec: UnionExec): Boolean = {
+(Seq(unionExec) ++ unionExec.children).map(_.output).transpose.map(attrs 
=> {
+  val firstAttr = attrs.head
+  attrs.forall(_.dataType == firstAttr.dataType)
+}).forall(_ == true)
+  }
+
+  def createNewChildrenDistribution(unionExec: UnionExec,
+unionRequired: Distribution): 
Seq[Distribution] = {
+unionExec.children.map(unionChild => {
+  // create output attributes map (union output exprId -> child output)
+  val attributes = 
unionExec.output.map(_.exprId).zip(unionChild.output).toMap
+  unionRequired match {
+case distribution: ClusteredDistribution =>
+  val newExpressions = distribution.clustering.map(exp => 
exp.transformUp {
+case attr: Attribute if attributes.contains(attr.exprId) => 
attributes(attr.exprId)
+  })
+  ClusteredDistribution(newExpressions, 
distribution.requireAllClusterKeys,
+distribution.requiredNumPartitions)
+
+case distribution => distribution
+  }
+})
+  }
+
+  override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+case operator: SparkPlan if 
conf.getConf(SQLConf.UNION_REQUIRED_DISTRIBUTION_PUSHDOWN) &&
+  operator.children.exists(_.isInstanceOf[UnionExec]) =>
+  val newChildren = operator.children.zipWithIndex.map {
+case (unionExec: UnionExec, childIndex: Int) if 
!unionExec.outputPartitioning.
+  satisfies(operator.requiredChildDistribution(childIndex)) =>
+  val unionRequired = operator.requiredChildDistribution(childIndex)
+  unionRequired match {
+case _: ClusteredDistribution if 
allChildrenSameDataType(unionExec) =>
+  val requiredChildDistribution: Seq[Distribution] =
+createNewChildrenDistribution(unionExec, unionRequired)
+  val numPartitions = unionRequired.requiredNumPartitions
+.getOrElse(conf.numShufflePartitions)
+  val outputPartitioning = 
unionRequired.createPartitioning(numPartitions)
+  UnionZipExec(unionExec.children, requiredChildDistribution,
+outputPartitioning)
+
+case _: Distribution => unionExec
+  }
+
+case (child, _) => child
+  }
+  operator.withNewChildren(newChildren)
+  }
+}
+
+/**
+ * Class representing partitions of UnionZipRDD, which maintains the list of
+ * corresponding partitions of parent RDDs.
+ */
+private[spark]
+class UnionZipRDDPartition(@transient val rdds: Seq[RDD[_]],
+   override val index: Int) extends Partition {
+  var parents: Array[Partition] = rdds.map(_.partitions(index)).toArray
+
+  override def hashCode(): Int = index
+
+  override def equals(other: Any):

[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


itholic commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154324225


##
python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py:
##
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.data_type_ops.test_binary_ops import 
BinaryOpsTestsMixin
+from pyspark.pandas.tests.connect.data_type_ops.testing_utils import 
OpsTestBase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class BinaryOpsParityTests(
+BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, 
ReusedConnectTestCase
+):
+@unittest.skip("Fails in Spark Connect, should enable.")

Review Comment:
   Just created ticket just in case don't forget: SPARK-42996



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


itholic commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154324225


##
python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py:
##
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.data_type_ops.test_binary_ops import 
BinaryOpsTestsMixin
+from pyspark.pandas.tests.connect.data_type_ops.testing_utils import 
OpsTestBase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class BinaryOpsParityTests(
+BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, 
ReusedConnectTestCase
+):
+@unittest.skip("Fails in Spark Connect, should enable.")

Review Comment:
   Just created ticket just in case to not forget: SPARK-42996



##
python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py:
##
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.data_type_ops.test_binary_ops import 
BinaryOpsTestsMixin
+from pyspark.pandas.tests.connect.data_type_ops.testing_utils import 
OpsTestBase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class BinaryOpsParityTests(
+BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, 
ReusedConnectTestCase
+):
+@unittest.skip("Fails in Spark Connect, should enable.")

Review Comment:
   Created ticket just in case to not forget: SPARK-42996



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect

2023-03-31 Thread via GitHub


itholic commented on code in PR #40525:
URL: https://github.com/apache/spark/pull/40525#discussion_r1154325557


##
dev/sparktestsupport/modules.py:
##
@@ -693,6 +693,56 @@ def __hash__(self):
 "pyspark.pandas.tests.test_typedef",
 "pyspark.pandas.tests.test_utils",
 "pyspark.pandas.tests.test_window",
+# unittests - Spark Connect parity tests

Review Comment:
   Updated. Thanks! :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error

2023-03-31 Thread via GitHub


shrprasa commented on PR #40258:
URL: https://github.com/apache/spark/pull/40258#issuecomment-1491795763

   
   
   
   
   > according to the [code in 
2.3](https://github.com/apache/spark/blob/branch-2.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L190),
 I think we should call `distinct` in line 345
   
   @cloud-fan 
   Yes, that should also work, but making it there will increase the impact of 
change to lot more other scenarios.
   Whereas the place where I have made distinct keeps the scope very limited.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] johanl-db commented on a diff in pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

2023-03-31 Thread via GitHub


johanl-db commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1154397094


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##
@@ -220,9 +220,20 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
   attributeReference match {
 case attr @ FileSourceMetadataAttribute(

Review Comment:
   We only cleanup the individual metadata fields but the _metadata attribute 
itself does have the right information to match on 
`FileSourceMetadataAttribute` :
   ```
 def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
   // Strip out the fields' metadata to avoid exposing it to the user. 
[[FileSourceStrategy]]
   // avoids confusion by mapping back to [[metadataSchemaFields]].
   val fields = metadataSchemaFields(fileFormat)
 .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)
   FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields))
 }
   ```
   The following is true:
   ```
   val metadata_attr = createFileMetadataCol(fileFormat: FileFormat)
   metadata_attr.metadata.getBoolean(METADATA_COL_ATTR_KEY) == true
   metadata_attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) == true
   
   metadata_attr.dataType.forall(_.metadata == Metadata.empty)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

2023-03-31 Thread via GitHub


cloud-fan commented on PR #40545:
URL: https://github.com/apache/spark/pull/40545#issuecomment-1491879620

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan closed pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

2023-03-31 Thread via GitHub


cloud-fan closed pull request #40545: [SPARK-42918] Generalize handling of 
metadata attributes in FileSourceStrategy
URL: https://github.com/apache/spark/pull/40545


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error

2023-03-31 Thread via GitHub


cloud-fan commented on PR #40258:
URL: https://github.com/apache/spark/pull/40258#issuecomment-1491886464

   If you really worry about regression, we can add a legacy config to fall 
back to the old code. I don't agree to make code changes that only fix the 
problem in one particular code path, while we know other code paths have the 
same problem as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a diff in pull request #40575: [SPARK-42945][CONNECT] Support PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect

2023-03-31 Thread via GitHub


allisonwang-db commented on code in PR #40575:
URL: https://github.com/apache/spark/pull/40575#discussion_r1154486470


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -109,23 +118,41 @@ class SparkConnectService(debug: Boolean)
*/
   private def handleError[V](
   opType: String,
-  observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = {
-case se: SparkException if isPythonExecutionException(se) =>
-  logError(s"Error during: $opType", se)
-  observer.onError(
-
StatusProto.toStatusRuntimeException(buildStatusFromThrowable(se.getCause)))
-
-case e: Throwable if e.isInstanceOf[SparkThrowable] || NonFatal.apply(e) =>
-  logError(s"Error during: $opType", e)
-  
observer.onError(StatusProto.toStatusRuntimeException(buildStatusFromThrowable(e)))
-
-case e: Throwable =>
-  logError(s"Error during: $opType", e)
-  observer.onError(
-Status.UNKNOWN
-  .withCause(e)
-  .withDescription(StringUtils.abbreviate(e.getMessage, 2048))
-  .asRuntimeException())
+  observer: StreamObserver[V],
+  userId: String,
+  sessionId: String): PartialFunction[Throwable, Unit] = {
+val session =
+  SparkConnectService
+.getOrCreateIsolatedSession(userId, sessionId)
+.session
+val stackTraceEnabled = try {
+  session.conf.get(
+
org.apache.spark.sql.internal.SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED.key).toBoolean
+} catch {
+  case NonFatal(_) => true
+}
+
+{

Review Comment:
   Do you mean the try-catch block or the partial function block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40574: [SPARK-42942][SQL] Support coalesce table cache stage partitions

2023-03-31 Thread via GitHub


jaceklaskowski commented on code in PR #40574:
URL: https://github.com/apache/spark/pull/40574#discussion_r1154480284


##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQERead.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, 
UnaryExecNode}
+
+abstract class AQERead extends UnaryExecNode {
+  def child: SparkPlan
+  def partitionSpecs: Seq[ShufflePartitionSpec]
+
+  assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at 
least one partition")
+
+  override final def output: Seq[Attribute] = child.output
+  override final def supportsColumnar: Boolean = child.supportsColumnar
+  override final def supportsRowBased: Boolean = child.supportsRowBased
+
+  def outputPartitionWithCoalesced(numPartitions: Int): Partitioning = {
+// For coalesced shuffle read, the data distribution is not changed, only 
the number of
+// partitions is changed.
+child.outputPartitioning match {
+  case h: HashPartitioning =>
+CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = 
numPartitions))
+  case r: RangePartitioning =>
+CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = 
numPartitions))
+  // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses
+  // `RoundRobinPartitioning` but we don't need to retain the number of 
partitions.
+  case r: RoundRobinPartitioning =>
+r.copy(numPartitions = numPartitions)
+  case other@SinglePartition =>

Review Comment:
   nit: spaces around `@`?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQERead.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, 
UnknownPartitioning}
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, 
UnaryExecNode}
+
+abstract class AQERead extends UnaryExecNode {
+  def child: SparkPlan
+  def partitionSpecs: Seq[ShufflePartitionSpec]
+
+  assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at 
least one partition")
+
+  override final def output: Seq[Attribute] = child.output
+  override final def supportsColumnar: Boolean = child.supportsColumnar
+  override final def supportsRowBased: Boolean = child.supportsRowBased
+
+  def outputPartitionWithCoalesced(numPartitions: Int): Partitioning = {
+// For coalesced shuffle read, the data distribution is not changed, only 
the number of
+// partitions is changed.
+child.outputPartitioning match {
+  case h: HashPartitioning =>
+CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = 
numPartitions))
+  case r: RangePartitioning =>
+CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = 
numPart

[GitHub] [spark] allisonwang-db commented on a diff in pull request #40575: [SPARK-42945][CONNECT] Support PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect

2023-03-31 Thread via GitHub


allisonwang-db commented on code in PR #40575:
URL: https://github.com/apache/spark/pull/40575#discussion_r1154490873


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -73,18 +74,26 @@ class SparkConnectService(debug: Boolean)
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable): RPCStatus = {
+  private def buildStatusFromThrowable(
+  st: Throwable,
+  stackTraceEnabled: Boolean = false): RPCStatus = {
+val errorInfo = ErrorInfo
+  .newBuilder()
+  .setReason(st.getClass.getName)
+  .setDomain("org.apache.spark")
+  .putMetadata("classes", 
compact(render(allClasses(st.getClass).map(_.getName
+
+val withStackTrace = if (stackTraceEnabled) {
+  val stackTrace = ExceptionUtils.getStackTrace(st)
+  errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace, 
4096))

Review Comment:
   This is a good question. I wonder if we should make this configurable for 
users.



##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -73,18 +74,26 @@ class SparkConnectService(debug: Boolean)
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable): RPCStatus = {
+  private def buildStatusFromThrowable(
+  st: Throwable,
+  stackTraceEnabled: Boolean = false): RPCStatus = {
+val errorInfo = ErrorInfo
+  .newBuilder()
+  .setReason(st.getClass.getName)
+  .setDomain("org.apache.spark")
+  .putMetadata("classes", 
compact(render(allClasses(st.getClass).map(_.getName
+
+val withStackTrace = if (stackTraceEnabled) {
+  val stackTrace = ExceptionUtils.getStackTrace(st)
+  errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace, 
4096))
+} else {
+  errorInfo
+}
+
 RPCStatus
   .newBuilder()
   .setCode(RPCCode.INTERNAL_VALUE)
-  .addDetails(
-ProtoAny.pack(
-  ErrorInfo
-.newBuilder()
-.setReason(st.getClass.getName)
-.setDomain("org.apache.spark")
-.putMetadata("classes", 
compact(render(allClasses(st.getClass).map(_.getName
-.build()))
+  .addDetails(ProtoAny.pack(withStackTrace.build()))

Review Comment:
   The stack trace is disabled by default, so it should not affect typical 
users. But I agree it can be hard to debug such issues. One solution could be 
to make the stack trace size configurable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error

2023-03-31 Thread via GitHub


shrprasa commented on PR #40258:
URL: https://github.com/apache/spark/pull/40258#issuecomment-1491954910

   > If you really worry about regression, we can add a legacy config to fall 
back to the old code. I don't agree to make code changes that only fix the 
problem in one particular code path, while we know other code paths have the 
same problem as well.
   
   Ok, I will update the PR with suggested change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40609: [SPARK-42316][SQL] Assign name to _LEGACY_ERROR_TEMP_2044

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40609:
URL: https://github.com/apache/spark/pull/40609#discussion_r1154512853


##
sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala:
##
@@ -625,6 +625,20 @@ class QueryExecutionErrorsSuite
 }
   }
 
+  test("BINARY_ARITHMETIC_CAUSE_OVERFLOW: byte plus byte result overflow") {
+checkError(
+  exception = intercept[SparkArithmeticException] {
+sql(s"select CAST('5' AS TINYINT) + CAST('5' AS TINYINT)").collect()
+  },
+  errorClass = "BINARY_ARITHMETIC_CAUSE_OVERFLOW",
+  parameters = Map(
+"value1" -> "5",
+"symbol" -> "+",

Review Comment:
   > Hi, I use `select CAST('32767' AS TINYINT) + CAST('32767' AS TINYINT)` but 
result null, is it acceptable? If not, I will fix it. https://user-images.githubusercontent.com/32387433/229014398-cbae18d7-a7bb-4de2-a44e-a44703ce0f99.png";>
   
   Forget it, I know why. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40607: [SPARK-42993][ML][CONNECT] Make PyTorch Distributor support Spark Connect

2023-03-31 Thread via GitHub


jaceklaskowski commented on code in PR #40607:
URL: https://github.com/apache/spark/pull/40607#discussion_r1154524221


##
python/pyspark/ml/torch/distributor.py:
##
@@ -581,12 +590,12 @@ def _run_distributed_training(
 f"Started distributed training with {self.num_processes} executor 
proceses"

Review Comment:
   s/proceses/processes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer

2023-03-31 Thread via GitHub


dongjoon-hyun commented on PR #40589:
URL: https://github.com/apache/spark/pull/40589#issuecomment-1492257092

   cc @sunchao , too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #40596: [SPARK-42973][CONNECT][BUILD] Upgrade buf to v1.16.0

2023-03-31 Thread via GitHub


dongjoon-hyun closed pull request #40596: [SPARK-42973][CONNECT][BUILD] Upgrade 
buf to v1.16.0
URL: https://github.com/apache/spark/pull/40596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-31 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1154700264


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   So the type of value in the state will be Long? SGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark

2023-03-31 Thread via GitHub


rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1154705202


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   If we are saving long, I would suggesting changing the name to `expiresAtMs` 
or `expiresAtMicros` depending on it unit. 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##
@@ -980,3 +1022,65 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
 
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+keyExpressions: Seq[Attribute],
+child: SparkPlan,
+stateInfo: Option[StatefulOperatorStateInfo] = None,
+eventTimeWatermarkForLateEvents: Option[Long] = None,
+eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+Array(StructField("expiresAt", LongType, nullable = false)))

Review Comment:
   If we are saving long, I would suggest changing the name to `expiresAtMs` or 
`expiresAtMicros` depending on it unit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer

2023-03-31 Thread via GitHub


dongjoon-hyun commented on code in PR #40589:
URL: https://github.com/apache/spark/pull/40589#discussion_r1154735386


##
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala:
##
@@ -166,6 +177,14 @@ class SparkSessionExtensions {
 runtimeOptimizerRules += builder
   }
 
+  /**
+   * Inject a rule that can override the query stage optimizer phase of 
adaptive query
+   * execution.
+   */
+  def injectQueryStageOptimizerRule(builder: QueryStagePrepRuleBuilder): Unit 
= {

Review Comment:
   Oh, this looks like a type of `QueryStageOptimizerRuleBuilder`. We need a 
builder of type `QueryStageOptimizerRuleBuilder` instead of 
`QueryStagePrepRuleBuilder` type, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer

2023-03-31 Thread via GitHub


dongjoon-hyun commented on code in PR #40589:
URL: https://github.com/apache/spark/pull/40589#discussion_r1154736185


##
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala:
##
@@ -1161,3 +1177,16 @@ object AddLimit extends Rule[LogicalPlan] {
 case _ => Limit(Literal(1), plan)
   }
 }
+
+object RequireAtLeaseTwoPartitions extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+val readOpt = plan.find(_.isInstanceOf[AQEShuffleReadExec])
+if (readOpt.exists(_.outputPartitioning.numPartitions == 1)) {
+  plan.transform {
+case read: AQEShuffleReadExec => read.child
+  }
+} else {
+ plan

Review Comment:
   indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] arunsimhateachmint opened a new pull request, #40625: [TDE-88] Access control on spark-3.3.2

2023-03-31 Thread via GitHub


arunsimhateachmint opened a new pull request, #40625:
URL: https://github.com/apache/spark/pull/40625

   
   
   ### What changes were proposed in this pull request?
   Implement access control on a database via row level security on metadata
   
   
   
   ### Why are the changes needed?
   Implemneting this would bring access control on spark-standalone
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   using doAs for thrift-server / --proxy-user for shell would restrict access
   
   
   
   ### How was this patch tested?
   Created 2 users ( with row level restrictions on metadata db )
   
   - To test --proxy-user : Started a shell with --proxy-user flag and ran show 
tables / count rows on table (with access / without access)
   
   - To test doAs : Started a thrift server. Connected as test user via beeline 
and ran show tables / count rows on table (with access / without access)
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] arunsimhateachmint closed pull request #40625: [TDE-88] Access control on spark-3.3.2

2023-03-31 Thread via GitHub


arunsimhateachmint closed pull request #40625: [TDE-88] Access control on 
spark-3.3.2 
URL: https://github.com/apache/spark/pull/40625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ritikam2 commented on pull request #18994: [SPARK-21784][SQL] Adds support for defining informational primary key and foreign key constraints using ALTER TABLE DDL.

2023-03-31 Thread via GitHub


ritikam2 commented on PR #18994:
URL: https://github.com/apache/spark/pull/18994#issuecomment-1492432645

   Hi Suresh I am trying to look into your changes. I am not able to find the 
following
   In SparkSqlParser cannot find AddTableConstraintContext
 ```
   override def visitAddTableConstraint(
   ctx: AddTableConstraintContext): 
LogicalPlan = withOrigin(ctx) {
   val tableIdentifier = visitTableIdentifier(ctx.tableIdentifier)
   ``` 
   In SparkSqlParser cannot find TableConstraintContext  
   ```
   `override` def visitTableConstraint(
ctx: TableConstraintContext): 
TableConstraint = withOrigin(ctx) {
   val keyColNames = visitIdentifierList(ctx.keyColNames).toArray
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] clownxc opened a new pull request, #40626: [SPARK-42860][SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode

2023-03-31 Thread via GitHub


clownxc opened a new pull request, #40626:
URL: https://github.com/apache/spark/pull/40626

   ### What changes were proposed in this pull request?
   
   This PR proposes to provide a explain mode that will only print the parsed 
and analysed logical plans only
   
   ### Why are the changes needed?
   validate the sql query before submitting the job . We are currently using 
df.explain(extended=true) which generates parsed , analysed , optimised logical 
plan and physical plan . But generating  optimised logical plan  sometimes 
takes more time.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   ### How was this patch tested?
   Pass GitHub Actions
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bjornjorgensen commented on pull request #40622: fix typo in ResourceRequest.equals()

2023-03-31 Thread via GitHub


bjornjorgensen commented on PR #40622:
URL: https://github.com/apache/spark/pull/40622#issuecomment-1492468583

   @thyecust "vendor == vendor is always true"
   
   In line 84 the val vendor field is declared as Optional[String], which means 
it can have a value of Some(String) when a vendor name is provided, or None 
when no vendor name is specified.
   
   The failed test is not related to this, can you restart failed test? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

2023-03-31 Thread via GitHub


dtenedor commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154843262


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never 
return null */
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType 
params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+// scalastyle:off caselocale
+new HllSketch(lgConfigK, TgtHllType.valueOf(tgtHllType.toUpperCase))
+// scalastyle:on caselocale
+  }
+
+  /**
+   * Evaluate the input row and update the HllSketch instance with the row's 
value.
+   * The update function only supports a subset of Spark SQL types, and an
+   * UnsupportedOperationException will be thrown for unsupported types.
+   *
+   * @param sketch The HllSketch instance.
+   * @param input  an input row
+   */
+  override def update(sketch: HllSketch, input: InternalRow): HllSketch = {
+val v = child.eval(input)
+if (v != null) {
+  child.dataType match {
+// Update implemented for all types supported by HllSketch
+// Spark SQL doesn't have equivalent types for ByteBuffer or char[] so 
leave those out
+case IntegerType => sketch.update(v.asInstanceOf[Int])
+case LongType => sketch.update(v.asInstanceOf[Long])
+case DoubleType => sketch.update(v.asInstanceOf[Double])

Review Comment:
   I don't think it is a good idea to support IEEE floating-point input types 
for these functions. There is imprecision when comparing them to each other 
which can lead to instability in the results. By the same token, performing a 
GROUP BY operation by floating-point values is not a good idea. For reference, 
see these function definitions which do not include floating-point input types 
[1].
   
   On the other hand, I don't see any reason why we can't support DecimalType, 
Datetime or Interval types (we can leave a TODO comment to support those in 
another PR if we want).
   
   [1] https://github.com/google/zetasql/blob/master/docs/hll_functions.md



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LI

[GitHub] [spark] sunchao commented on a diff in pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader

2023-03-31 Thread via GitHub


sunchao commented on code in PR #39950:
URL: https://github.com/apache/spark/pull/39950#discussion_r1154849296


##
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java:
##
@@ -89,17 +90,28 @@
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
   throws IOException, InterruptedException {
+initialize(inputSplit, taskAttemptContext, Option.empty());
+  }
+
+  public void initialize(
+  InputSplit inputSplit,
+  TaskAttemptContext taskAttemptContext,
+  Option fileFooter) throws IOException, 
InterruptedException {
 Configuration configuration = taskAttemptContext.getConfiguration();
 FileSplit split = (FileSplit) inputSplit;
 this.file = split.getPath();
-
-ParquetReadOptions options = HadoopReadOptions
-  .builder(configuration, file)
-  .withRange(split.getStart(), split.getStart() + split.getLength())
-  .withCodecFactory(new ParquetCodecFactory(configuration, 0))
-  .build();
-ParquetFileReader fileReader = new ParquetFileReader(
-HadoopInputFile.fromPath(file, configuration), options);
+ParquetFileReader fileReader;
+if (fileFooter.isDefined()) {
+  fileReader = new ParquetFileReader(configuration, file, 
fileFooter.get());

Review Comment:
   yes.. this will be an issue unless Spark is upgraded to Parquet 1.12.4



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
##
@@ -205,11 +205,22 @@ class ParquetFileFormat
 
   val sharedConf = broadcastedHadoopConf.value.value
 
-  lazy val footerFileMetaData =
-ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+  val fileFooter = if (enableVectorizedReader) {
+// This can avoid reading the footer twice(currently only optimize for 
vectorized read).

Review Comment:
   not addressed yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

2023-03-31 Thread via GitHub


RyanBerti commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154926480


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never 
return null */
+  override def nullable: Boolean = false
+
+  /**
+   * Instantiate an HllSketch instance using the lgConfigK and tgtHllType 
params.
+   *
+   * @return an HllSketch instance
+   */
+  override def createAggregationBuffer(): HllSketch = {
+// scalastyle:off caselocale

Review Comment:
   I initially had that in-place, but removed it as I wasn't sure the 
enum.valueOf call would handle any non-english locales. I can update with 
Locale.Root though, just was unfamiliar with that implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

2023-03-31 Thread via GitHub


RyanBerti commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never 
return null */

Review Comment:
   Just ran a quick test, and both null and empty return 0: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

2023-03-31 Thread via GitHub


RyanBerti commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never 
return null */

Review Comment:
   Just ran a quick test, and both null and empty return 0: 
   
   `  val sketch = new HllSketch()
   assert(sketch.getEstimate == 0)
   val nullString: String = null
   sketch.update(nullString)
   assert(sketch.getEstimate == 0)`



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions 

[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch

2023-03-31 Thread via GitHub


RyanBerti commented on code in PR #40615:
URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
+import org.apache.datasketches.memory.WritableMemory
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.trees.UnaryLike
+import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, 
IntegerType, LongType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+
+/**
+ * This datasketchesAggregates file is intended to encapsulate all of the
+ * aggregate functions that utilize Datasketches sketch objects as intermediate
+ * aggregation buffers.
+ *
+ * The HllSketchAggregate sealed trait is meant to be extended by the aggregate
+ * functions which utilize instances of HllSketch to count uniques.
+ */
+sealed trait HllSketchAggregate
+  extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] {
+
+  // These are used as params when instantiating the HllSketch object
+  // and are set by the case classes' args that extend this trait
+
+  def lgConfigK: Int
+  def tgtHllType: String
+
+  // From here on, these are the shared default implementations for 
TypedImperativeAggregate
+
+  /** Aggregate functions which utilize HllSketch instances should never 
return null */

Review Comment:
   Just ran a quick test, and both null and empty return 0: 
   
   ```  
   val sketch = new HllSketch()
   assert(sketch.getEstimate == 0)
   val nullString: String = null
   sketch.update(nullString)
   assert(sketch.getEstimate == 0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on pull request #40586: [SPARK-42939][SS][CONNECT] Core streaming Python API for Spark Connect

2023-03-31 Thread via GitHub


WweiL commented on PR #40586:
URL: https://github.com/apache/spark/pull/40586#issuecomment-1492687197

   I have less expertises in protobuf, otherwise LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin opened a new pull request, #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct

2023-03-31 Thread via GitHub


ueshin opened a new pull request, #40627:
URL: https://github.com/apache/spark/pull/40627

   ### What changes were proposed in this pull request?
   
   Fix `DataFrame.collect` with null struct.
   
   ### Why are the changes needed?
   
   There is a behavior difference when collecting `null` struct:
   
   In Spark Connect:
   
   ```py
   >>> df = spark.sql("values (1, struct('a' as x)), (2, struct(null as x)), 
(null, null) as t(a, b)")
   >>> df.printSchema()
   root
|-- a: integer (nullable = true)
|-- b: struct (nullable = true)
||-- x: string (nullable = true)
   >>> df.show()
   ++--+
   |   a| b|
   ++--+
   |   1|   {a}|
   |   2|{null}|
   |null|  null|
   ++--+
   
   >>> df.collect()
   [Row(a=1, b=Row(x='a')), Row(a=2, b=Row(x=None)), Row(a=None, b=)]
   ```
   
   whereas PySpark:
   
   ```py
   >>> df.collect()
   [Row(a=1, b=Row(x='a')), Row(a=2, b=Row(x=None)), Row(a=None, b=None)]
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   The behavior fix.
   
   ### How was this patch tested?
   
   Added/modified the related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo opened a new pull request, #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

2023-03-31 Thread via GitHub


zhenlineo opened a new pull request, #40628:
URL: https://github.com/apache/spark/pull/40628

   ### What changes were proposed in this pull request?
   Implements missing methods in Dataset: foreach and foreachPartition.
   PR based on top of https://github.com/apache/spark/pull/40581
   The impl of foreachPartition is based on top of mapPartitions + count.
   
   ### Why are the changes needed?
   Add missing methods in Dataset.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   E2E tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #39136: Create stable names for dynamically generated classes

2023-03-31 Thread via GitHub


github-actions[bot] commented on PR #39136:
URL: https://github.com/apache/spark/pull/39136#issuecomment-1492754069

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #39114: [SPARK-40708][SQL][WIP] Auto update partition statistics based on write metrics

2023-03-31 Thread via GitHub


github-actions[bot] commented on PR #39114:
URL: https://github.com/apache/spark/pull/39114#issuecomment-1492754086

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #39130: [SPARK-xxxxx][DOCUMENTATION][PYTHON] Fix grammar in docstring for toDF().

2023-03-31 Thread via GitHub


github-actions[bot] closed pull request #39130: 
[SPARK-x][DOCUMENTATION][PYTHON] Fix grammar in docstring for toDF().
URL: https://github.com/apache/spark/pull/39130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


zhenlineo commented on PR #40564:
URL: https://github.com/apache/spark/pull/40564#issuecomment-1492754667

   cc @LuciferYang Can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] liuzqt opened a new pull request, #40629: [SPARK-42980][CORE] Implement a lightweight SmallBroadcast

2023-03-31 Thread via GitHub


liuzqt opened a new pull request, #40629:
URL: https://github.com/apache/spark/pull/40629

   ### What changes were proposed in this pull request?
   The current TorrentBroadcast implementation is originally designed for large 
data transmission where driver might become the bottleneck and memory might 
also be the concern. Therefore it's kind of heavy and comes with some fixed 
overhead:
   
   - torrent protocol: more round traffic
   - disk level persistency, BlockManager, extra overhead
   
   which makes it inefficient for some small data transmission.
   
   We can have a lightweight broadcast implementation, e.g, SmallBroadcast, 
which implement star-topology broadcast(which avoids the unnecessary round 
traffic), and maybe in-memory only.
   
   Note:
   
   The current factory style API seem to encourage only one broadcast style on 
runtime, however this small broadcast implementation is a supplement to 
`TorrentBroadcast`. So I introduce new small broadcast APIs along the code path.
   
   ### Why are the changes needed?
   Provide a lightweight implementation of `Broadcast`, which is suitable for 
situation where data size is small and latency is critical.
   
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   
   ### How was this patch tested?
   
   Unit test. More testcases are from `BroadcastSuite`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi opened a new pull request, #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi opened a new pull request, #40630:
URL: https://github.com/apache/spark/pull/40630

   
   
   ### What changes were proposed in this pull request?
   
   
   This PR fixes `TableOutputResolver` to use correct column paths in error 
messages for arrays and maps.
   
   ### Why are the changes needed?
   
   
   These changes are needed to have accurate error messages when there is a 
type mismatch inside arrays and maps.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No.
   
   ### How was this patch tested?
   
   
   This PR comes with tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155030408


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -203,16 +204,17 @@ object TableOutputResolver {
   addError(s"Cannot write nullable values to map of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val keyParam = NamedLambdaVariable("k", inputType.keyType, nullable = 
false)
-  val fakeKeyAttr = AttributeReference("k", expectedType.keyType, nullable 
= false)()
+  val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = 
false)
+  val fakeKeyAttr = AttributeReference("key", expectedType.keyType, 
nullable = false)()
   val resKey = reorderColumnsByName(
-Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath :+ "key")

Review Comment:
   Error messages included `map_col.k.key.struct_col`. They include 
`map_col.key.struct_col` now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on PR #40630:
URL: https://github.com/apache/spark/pull/40630#issuecomment-1492767210

   cc  @cloud-fan @dongjoon-hyun @viirya @huaxingao @sunchao


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] clownxc closed pull request #40626: [SPARK-42860][SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode

2023-03-31 Thread via GitHub


clownxc closed pull request #40626: [SPARK-42860][SQL] Add analysed logical 
mode in org.apache.spark.sql.execution.ExplainMode
URL: https://github.com/apache/spark/pull/40626


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on PR #40564:
URL: https://github.com/apache/spark/pull/40564#issuecomment-1492772726

   The pr title should be `[SPARK-42519][CONNECT][TESTS] ...`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on PR #40564:
URL: https://github.com/apache/spark/pull/40564#issuecomment-1492773043

   @zhenlineo Thanks for ping me ~ Let me do some manual checks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155033254


##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##
@@ -434,7 +433,9 @@ abstract class InMemoryBaseTable(
 protected var streamingWriter: StreamingWrite = StreamingAppend
 
 override def overwriteDynamicPartitions(): WriteBuilder = {
-  assert(writer == Append)
+  if (writer != Append) {

Review Comment:
   What is the reason for changing the exception type?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] clownxc opened a new pull request, #40631: [SPARK-42860] [SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode

2023-03-31 Thread via GitHub


clownxc opened a new pull request, #40631:
URL: https://github.com/apache/spark/pull/40631

   
   
   ### What changes were proposed in this pull request?
   
   This PR proposes to provide a 'ValidationMode' in 
org.apache.spark.sql.execution.ExplainMode that will only print the parsed and 
analysed logical plans only.
   
   ### Why are the changes needed?
   
   Validate the sql query before submitting the job . We are currently using 
df.explain(extended=true) which generates parsed , analysed , optimised logical 
plan and physical plan . But generating optimised logical plan sometimes takes 
more time.
   
   ### Does this PR introduce *any* user-facing change?
   
   Yes
   
   ### How was this patch tested?
   
   Pass GitHub Actions
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155033747


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##
@@ -58,10 +58,15 @@ object SparkConnectServerUtils {
 
   private lazy val sparkConnect: Process = {
 debug("Starting the Spark Connect Server...")
-val jar = findJar(
+val connectJar = findJar(
   "connector/connect/server",
   "spark-connect-assembly",
   "spark-connect").getCanonicalPath
+var driverClassPath = connectJar

Review Comment:
   I think we can make `driverClassPath` as `val` like:
   ```
   val driverClassPath = f (IntegrationTestUtils.isCatalystTestJarAvailable) {
   connectJar += ":" ...
   } else connectJar
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155034222


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)
+withTable("testcat.myTableV2") {
+  spark.conf.set(

Review Comment:
   can we use `withSQLConf` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct

2023-03-31 Thread via GitHub


zhengruifeng closed pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix 
DataFrame.collect with null struct
URL: https://github.com/apache/spark/pull/40627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct

2023-03-31 Thread via GitHub


zhengruifeng commented on PR #40627:
URL: https://github.com/apache/spark/pull/40627#issuecomment-1492778516

   Thanks, merged into master/branch-3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155038497


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)

Review Comment:
   local run `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite" 
-Phive`, the new test always be `IGNORED`.
   
   Do you mind refactor the following code to make local sbt test can test them 
as default? 
   
   
https://github.com/apache/spark/blob/74cddcfda3ac4779de80696cdae2ba64d53fc635/project/SparkBuild.scala#L855-L857



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #40607: [SPARK-42993][ML][CONNECT] Make PyTorch Distributor support Spark Connect

2023-03-31 Thread via GitHub


zhengruifeng commented on code in PR #40607:
URL: https://github.com/apache/spark/pull/40607#discussion_r1155039367


##
python/pyspark/ml/torch/distributor.py:
##
@@ -581,12 +590,12 @@ def _run_distributed_training(
 f"Started distributed training with {self.num_processes} executor 
proceses"

Review Comment:
   thanks, will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155039507


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)
+withTable("testcat.myTableV2") {
+  spark.conf.set(

Review Comment:
   Or should we make this the default config for `SimpleSparkConnectService` 
startup, then we can skip this manual configuration
   
   @zhenlineo WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155039507


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)
+withTable("testcat.myTableV2") {
+  spark.conf.set(

Review Comment:
   @Hisoka-X Or should we make this the default config for 
`SimpleSparkConnectService` startup, then we can skip this manual configuration
   
   @zhenlineo WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


LuciferYang commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155039927


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)

Review Comment:
   For example , add `(LocalProject("catalyst") / Test / Keys.`package`).value` 
?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


dongjoon-hyun commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155040511


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   May I ask why we need to change variable name from `x` to `element` for this 
bug?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   May I ask why we need to change variable name from `x` to `element` for this 
bug fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet to 1.12.4

2023-03-31 Thread via GitHub


dongjoon-hyun commented on PR #40555:
URL: https://github.com/apache/spark/pull/40555#issuecomment-1492793200

   Shall we close this PR because Apache Parquet 1.12.4 vote seems to fail due 
to the technical issue.
   
   For the record, the release manage creates it from the wrong branch.
   - 
https://github.com/apache/parquet-mr/commit/22069e58494e7cb5d50e664c7ffa1cf1468404f8
   ```
   - 1.13.0-SNAPSHOT
   + 1.12.4
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155044709


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   The variable name is included in the error message because it will be part 
of the column path.
   Without this rename, the error message will refer `array_col.x`, where `x` 
is not known to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155044709


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   The variable name is included in the error message because it will be part 
of the column path.
   Without this rename, the error message will reference `array_col.x`, where 
`x` is not known to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155045026


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   It is similar to what we already do in `DataType$canWrite()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X opened a new pull request, #40632: [SPARK-42298][SQL] Assign name to _LEGACY_ERROR_TEMP_2132

2023-03-31 Thread via GitHub


Hisoka-X opened a new pull request, #40632:
URL: https://github.com/apache/spark/pull/40632

   
   
   ### What changes were proposed in this pull request?
   This PR proposes to assign name to _LEGACY_ERROR_TEMP_2132, 
"CANNOT_PARSE_JSON_ARRAYS_AS_STRUCTS".
   
   
   
   ### Why are the changes needed?
   Assign proper name to LEGACY_ERROR_TEMP
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   
   ### How was this patch tested?
   ./build/sbt "testOnly org.apache.spark.sql.errors.QueryExecutionErrorsSuite"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40632: [SPARK-42298][SQL] Assign name to _LEGACY_ERROR_TEMP_2132

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40632:
URL: https://github.com/apache/spark/pull/40632#discussion_r1155046085


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##
@@ -1404,8 +1404,8 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 
   def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = {
 new SparkRuntimeException(
-  errorClass = "_LEGACY_ERROR_TEMP_2132",
-  messageParameters = Map.empty)
+  errorClass = "CANNOT_PARSE_JSON_ARRAYS_AS_STRUCTS",

Review Comment:
   This error is never thrown to the user as it is intercepted by 
`MALFORMED_RECORD_IN_PARSING`. So I didn't add test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40563: [SPARK-41232][SPARK-41233][FOLLOWUP] Refactor `array_append` and `array_prepend` with `RuntimeReplaceable`

2023-03-31 Thread via GitHub


zhengruifeng commented on PR #40563:
URL: https://github.com/apache/spark/pull/40563#issuecomment-1492810135

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155047034


##
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##
@@ -434,7 +433,9 @@ abstract class InMemoryBaseTable(
 protected var streamingWriter: StreamingWrite = StreamingAppend
 
 override def overwriteDynamicPartitions(): WriteBuilder = {
-  assert(writer == Append)
+  if (writer != Append) {

Review Comment:
   The reason is we want to use catalyst test jar to submit ConnectService, so 
we can use `InMemoryTableCatalog`. But if `scalatest` used by 
`InMemoryTableCatalog`. We should add `scalatest` jar into ConnectService too. 
So I remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error

2023-03-31 Thread via GitHub


shrprasa commented on PR #40258:
URL: https://github.com/apache/spark/pull/40258#issuecomment-1492817996

   @cloud-fan I have made the change. All Tests have passed. Can you please 
review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa commented on pull request #40128: [SPARK-42466][K8S]: Cleanup k8s upload directory when job terminates

2023-03-31 Thread via GitHub


shrprasa commented on PR #40128:
URL: https://github.com/apache/spark/pull/40128#issuecomment-1492818493

   @holdenk @dongjoon-hyun TTL based clearnce has limitations as pointed out in 
this comment
   https://github.com/apache/spark/pull/40363#issuecomment-1467439787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155054750


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##
@@ -58,10 +58,15 @@ object SparkConnectServerUtils {
 
   private lazy val sparkConnect: Process = {
 debug("Starting the Spark Connect Server...")
-val jar = findJar(
+val connectJar = findJar(
   "connector/connect/server",
   "spark-connect-assembly",
   "spark-connect").getCanonicalPath
+var driverClassPath = connectJar

Review Comment:
   Thanks for advise. Finished



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155054855


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)
+withTable("testcat.myTableV2") {
+  spark.conf.set(

Review Comment:
   I removed it and change `RemoteSparkSession` to set config 
`spark.sql.catalog.testcat` as default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client

2023-03-31 Thread via GitHub


Hisoka-X commented on code in PR #40564:
URL: https://github.com/apache/spark/pull/40564#discussion_r1155054885


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
 }
   }
 
+  test("writeTo with create") {
+assume(IntegrationTestUtils.isCatalystTestJarAvailable)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum closed pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet to 1.12.4

2023-03-31 Thread via GitHub


wangyum closed pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet 
to 1.12.4
URL: https://github.com/apache/spark/pull/40555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps

2023-03-31 Thread via GitHub


aokolnychyi commented on code in PR #40630:
URL: https://github.com/apache/spark/pull/40630#discussion_r1155045026


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##
@@ -179,8 +179,9 @@ object TableOutputResolver {
   addError(s"Cannot write nullable elements to array of non-nulls: 
'${colPath.quoted}'")
   None
 } else {
-  val param = NamedLambdaVariable("x", inputType.elementType, 
inputType.containsNull)
-  val fakeAttr = AttributeReference("x", expectedType.elementType, 
expectedType.containsNull)()
+  val param = NamedLambdaVariable("element", inputType.elementType, 
inputType.containsNull)

Review Comment:
   The new logic is similar to what we already do in `DataType$canWrite()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org