[GitHub] [spark] wangyum opened a new pull request, #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode
wangyum opened a new pull request, #40616: URL: https://github.com/apache/spark/pull/40616 ### What changes were proposed in this pull request? 1. Moved `ResolveBinaryArithmetic` from `Analyzer` to `TypeCoercion` and `AnsiTypeCoercion`. 2. Update `ResolveBinaryArithmetic` in `AnsiTypeCoercion` to only support datetime types +/- intervals. ### Why are the changes needed? Avoid potential data issues. ### Does this PR introduce _any_ user-facing change? Disable string +/- interval in ANSI mode. ### How was this patch tested? Update existing test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic opened a new pull request, #40617: [SPARK-42992][PYTHON] Introduce PySparkRuntimeError
itholic opened a new pull request, #40617: URL: https://github.com/apache/spark/pull/40617 ### What changes were proposed in this pull request? This PR proposes to introduce new error for PySpark. ### Why are the changes needed? To cover the built-in RuntimeError by PySpark error framework. ### Does this PR introduce _any_ user-facing change? No, it's internal error framework improvement. ### How was this patch tested? The existing CI should pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode
wangyum commented on PR #40616: URL: https://github.com/apache/spark/pull/40616#issuecomment-1491411368 cc @cloud-fan @gengliangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thyecust opened a new pull request, #40618: fix typo in StorageLevel __eq__()
thyecust opened a new pull request, #40618: URL: https://github.com/apache/spark/pull/40618 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40523: [SPARK-42897][SQL] Avoid evaluate more than once for the variables from the left side in the FullOuter SMJ condition
cloud-fan commented on code in PR #40523: URL: https://github.com/apache/spark/pull/40523#discussion_r1154099624 ## sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala: ## @@ -1036,8 +1036,17 @@ case class SortMergeJoinExec( val rightResultVars = genOneSideJoinVars( ctx, rightOutputRow, right, setDefaultValue = true) val resultVars = leftResultVars ++ rightResultVars -val (_, conditionCheck, _) = - getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow)) +// Evaluate the variables on the left and used in the condition but do not clear the code as Review Comment: This is very hard to review. Is there any similar code in other join types or `ShuffledHashJoinExec`? It seems other places are using `splitVarsByCondition` and `evaluateVariables`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thyecust closed pull request #40618: fix typo in StorageLevel __eq__()
thyecust closed pull request #40618: fix typo in StorageLevel __eq__() URL: https://github.com/apache/spark/pull/40618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thyecust opened a new pull request, #40619: fix typo in StorageLevel __eq__()
thyecust opened a new pull request, #40619: URL: https://github.com/apache/spark/pull/40619 ### What changes were proposed in this pull request? fix `self.deserialized == self.deserialized` with `self.deserialized == other.deserialized` ### Why are the changes needed? The original expression is always True, which is likely to be a typo. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No test added. Use GitHub Actions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thyecust opened a new pull request, #40620: fix typo in pyspark/pandas/config.py
thyecust opened a new pull request, #40620: URL: https://github.com/apache/spark/pull/40620 By comparing compute.isin_limit and plotting.max_rows, `v is v` is likely to be a typo. ### What changes were proposed in this pull request? fix `v is v >= 0` with `v >= 0`. ### Why are the changes needed? By comparing compute.isin_limit and plotting.max_rows, `v is v` is likely to be a typo. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By GitHub Actions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] liangyu-1 opened a new pull request, #40621: Fix ExecutorAllocationManager cannot allocate new instances when all …
liangyu-1 opened a new pull request, #40621: URL: https://github.com/apache/spark/pull/40621 ### What changes were proposed in this pull request? the PR fix ExecutorAllocationManager cannot allocate new instances when all executors down. ### Why are the changes needed? As described in the issue [SPARK-42972](https://issues.apache.org/jira/browse/SPARK-42972) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I test this in our cluster and the spark streaming app runs normally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40616: [SPARK-42991][SQL] Disable string type +/- interval in ANSI mode
cloud-fan commented on code in PR #40616: URL: https://github.com/apache/spark/pull/40616#discussion_r1154114177 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala: ## @@ -288,6 +290,82 @@ object AnsiTypeCoercion extends TypeCoercionBase { } } + // Please see the comments in `TypeCoercion.ResolveBinaryArithmetic`. + object ResolveBinaryArithmetic extends TypeCoercionRule { Review Comment: is it possible to do some code sharing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thyecust opened a new pull request, #40622: fix typo in ResourceRequest.equals()
thyecust opened a new pull request, #40622: URL: https://github.com/apache/spark/pull/40622 vendor == vendor is always true, this is likely to be a typo. ### What changes were proposed in this pull request? fix `vendor == vendor` with `that.vendor == vendor`, and `discoveryScript == discoveryScript` with `that.discoveryScript == discoveryScript` ### Why are the changes needed? vendor == vendor is always true, this is likely to be a typo. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By GitHub Actions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] liangyu-1 commented on pull request #40621: Fix ExecutorAllocationManager cannot allocate new instances when all …
liangyu-1 commented on PR #40621: URL: https://github.com/apache/spark/pull/40621#issuecomment-1491457693 @Ngone51 @VasilyKolpakov please help me review the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name with a schema-Name
yaooqinn commented on PR #40602: URL: https://github.com/apache/spark/pull/40602#issuecomment-1491489837 thanks, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn closed pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name with a schema-Name
yaooqinn closed pull request #40602: [SPARK-42978][SQL] Derby&PG: RENAME cannot qualify a new-table-Name with a schema-Name URL: https://github.com/apache/spark/pull/40602 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40622: fix typo in ResourceRequest.equals()
HyukjinKwon commented on PR #40622: URL: https://github.com/apache/spark/pull/40622#issuecomment-1491509278 Hey, I think all these typo PRs should have a test if this is an actual issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40622: fix typo in ResourceRequest.equals()
HyukjinKwon commented on PR #40622: URL: https://github.com/apache/spark/pull/40622#issuecomment-1491509670 And please file a JIRA if this is an issue, see also https://spark.apache.org/contributing.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
zhengruifeng commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154257731 ## dev/sparktestsupport/modules.py: ## @@ -693,6 +693,56 @@ def __hash__(self): "pyspark.pandas.tests.test_typedef", "pyspark.pandas.tests.test_utils", "pyspark.pandas.tests.test_window", +# unittests - Spark Connect parity tests Review Comment: I think we should move the tests into `pyspark-connect`, and then make `pyspark-connect` also depends on `pyspark_core` ## python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py: ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.data_type_ops.test_binary_ops import BinaryOpsTestsMixin +from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class BinaryOpsParityTests( +BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase +): +@unittest.skip("Fails in Spark Connect, should enable.") Review Comment: I think we'd better document the failure reason: - some are need be enabled - some cann't, may due to private method `_jvm` in the test - etc but this can be done in followups -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization
LuciferYang commented on code in PR #40611: URL: https://github.com/apache/spark/pull/40611#discussion_r1154265174 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala: ## @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client.arrow + +import java.io.{ByteArrayOutputStream, OutputStream} +import java.lang.invoke.{MethodHandles, MethodType} +import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger} +import java.nio.channels.Channels +import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.ByteString +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, IntervalYearVector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, VarCharVector, VectorSchemaRoot, VectorUnloader} +import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel} +import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.util.Text + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.DefinedByConstructorParams +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.util.ArrowUtils + +/** + * Helper class for converting user objects into arrow batches. + */ +class ArrowSerializer[T]( +private[this] val enc: AgnosticEncoder[T], +private[this] val allocator: BufferAllocator, +private[this] val timeZoneId: String) { + private val (root, serializer) = ArrowSerializer.serializerFor(enc, allocator, timeZoneId) + private val vectors = root.getFieldVectors.asScala + private val unloader = new VectorUnloader(root) + private val schemaBytes = { +// Only serialize the schema once. +val bytes = new ByteArrayOutputStream() +MessageSerializer.serialize(newChannel(bytes), root.getSchema) +bytes.toByteArray + } + private var i: Int = 0 + + private def newChannel(output: OutputStream): WriteChannel = { +new WriteChannel(Channels.newChannel(output)) + } + + /** + * The size of the current batch. + * + * The size computed consist of the size of the schema and the size of the arrow buffers. The + * actual batch will be larger than that because of alignment, written IPC tokens, and the + * written record batch metadata. The size of the record batch metadata is proportional to the + * complexity of the schema. + */ + def sizeInBytes: Long = { +// We need to set the row count for getBufferSize to return the actual value. +root.setRowCount(i) +schemaBytes.length + vectors.map(_.getBufferSize).sum + } + + /** + * Append a record to the current batch. + */ + def append(record: T): Unit = { +serializer.write(i, record) +i += 1 + } + + /** + * Write the schema and the current batch in Arrow IPC stream format to the [[OutputStream]]. + */ + def writeIpcStream(output: OutputStream): Unit = { +val channel = newChannel(output) +root.setRowCount(i) +val batch = unloader.getRecordBatch +try { + channel.write(schemaBytes) + MessageSerializer.serialize(channel, batch) + ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT) +} finally { + batch.close() +} + } + + /** + * Reset the serializer. + */ + def reset(): Unit = { +i = 0 +vectors.foreach(_.reset()) + } + + /** + * Close the serializer. + */ + def close(): Unit = root.close() +} + +object ArrowSerializer { + import ArrowEncoderUtils._ + + /** + * Create an [[Iterator]] that
[GitHub] [spark] MaxGekk opened a new pull request, #40623: [WIP][SQL] Parameterized `sql()` with literal args
MaxGekk opened a new pull request, #40623: URL: https://github.com/apache/spark/pull/40623 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization
LuciferYang commented on code in PR #40611: URL: https://github.com/apache/spark/pull/40611#discussion_r1154282542 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala: ## @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client.arrow + +import java.io.{ByteArrayOutputStream, OutputStream} +import java.lang.invoke.{MethodHandles, MethodType} +import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger} +import java.nio.channels.Channels +import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.ByteString +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, IntervalYearVector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, VarCharVector, VectorSchemaRoot, VectorUnloader} +import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel} +import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.util.Text + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.DefinedByConstructorParams +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.util.ArrowUtils + +/** + * Helper class for converting user objects into arrow batches. + */ +class ArrowSerializer[T]( +private[this] val enc: AgnosticEncoder[T], +private[this] val allocator: BufferAllocator, +private[this] val timeZoneId: String) { + private val (root, serializer) = ArrowSerializer.serializerFor(enc, allocator, timeZoneId) + private val vectors = root.getFieldVectors.asScala + private val unloader = new VectorUnloader(root) + private val schemaBytes = { +// Only serialize the schema once. +val bytes = new ByteArrayOutputStream() +MessageSerializer.serialize(newChannel(bytes), root.getSchema) +bytes.toByteArray + } + private var i: Int = 0 + + private def newChannel(output: OutputStream): WriteChannel = { +new WriteChannel(Channels.newChannel(output)) + } + + /** + * The size of the current batch. + * + * The size computed consist of the size of the schema and the size of the arrow buffers. The + * actual batch will be larger than that because of alignment, written IPC tokens, and the + * written record batch metadata. The size of the record batch metadata is proportional to the + * complexity of the schema. + */ + def sizeInBytes: Long = { +// We need to set the row count for getBufferSize to return the actual value. +root.setRowCount(i) +schemaBytes.length + vectors.map(_.getBufferSize).sum + } + + /** + * Append a record to the current batch. + */ + def append(record: T): Unit = { +serializer.write(i, record) +i += 1 + } + + /** + * Write the schema and the current batch in Arrow IPC stream format to the [[OutputStream]]. + */ + def writeIpcStream(output: OutputStream): Unit = { +val channel = newChannel(output) +root.setRowCount(i) +val batch = unloader.getRecordBatch +try { + channel.write(schemaBytes) + MessageSerializer.serialize(channel, batch) + ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT) +} finally { + batch.close() +} + } + + /** + * Reset the serializer. + */ + def reset(): Unit = { +i = 0 +vectors.foreach(_.reset()) + } + + /** + * Close the serializer. + */ + def close(): Unit = root.close() +} + +object ArrowSerializer { + import ArrowEncoderUtils._ + + /** + * Create an [[Iterator]] that
[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization
LuciferYang commented on code in PR #40611: URL: https://github.com/apache/spark/pull/40611#discussion_r1154283195 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala: ## @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client.arrow + +import java.io.{ByteArrayOutputStream, OutputStream} +import java.lang.invoke.{MethodHandles, MethodType} +import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger} +import java.nio.channels.Channels +import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.ByteString +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, IntervalYearVector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, VarCharVector, VectorSchemaRoot, VectorUnloader} +import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel} +import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.util.Text + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.DefinedByConstructorParams +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.util.ArrowUtils + +/** + * Helper class for converting user objects into arrow batches. + */ +class ArrowSerializer[T]( +private[this] val enc: AgnosticEncoder[T], +private[this] val allocator: BufferAllocator, +private[this] val timeZoneId: String) { + private val (root, serializer) = ArrowSerializer.serializerFor(enc, allocator, timeZoneId) + private val vectors = root.getFieldVectors.asScala + private val unloader = new VectorUnloader(root) + private val schemaBytes = { +// Only serialize the schema once. +val bytes = new ByteArrayOutputStream() +MessageSerializer.serialize(newChannel(bytes), root.getSchema) +bytes.toByteArray + } + private var i: Int = 0 + + private def newChannel(output: OutputStream): WriteChannel = { +new WriteChannel(Channels.newChannel(output)) + } + + /** + * The size of the current batch. + * + * The size computed consist of the size of the schema and the size of the arrow buffers. The + * actual batch will be larger than that because of alignment, written IPC tokens, and the + * written record batch metadata. The size of the record batch metadata is proportional to the + * complexity of the schema. + */ + def sizeInBytes: Long = { +// We need to set the row count for getBufferSize to return the actual value. +root.setRowCount(i) +schemaBytes.length + vectors.map(_.getBufferSize).sum + } + + /** + * Append a record to the current batch. + */ + def append(record: T): Unit = { +serializer.write(i, record) +i += 1 + } + + /** + * Write the schema and the current batch in Arrow IPC stream format to the [[OutputStream]]. + */ + def writeIpcStream(output: OutputStream): Unit = { +val channel = newChannel(output) +root.setRowCount(i) +val batch = unloader.getRecordBatch +try { + channel.write(schemaBytes) + MessageSerializer.serialize(channel, batch) + ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT) +} finally { + batch.close() +} + } + + /** + * Reset the serializer. + */ + def reset(): Unit = { +i = 0 +vectors.foreach(_.reset()) + } + + /** + * Close the serializer. + */ + def close(): Unit = root.close() +} + +object ArrowSerializer { + import ArrowEncoderUtils._ + + /** + * Create an [[Iterator]] that
[GitHub] [spark] LuciferYang commented on a diff in pull request #40611: [SPARK-42981][CONNECT] Add direct arrow serialization
LuciferYang commented on code in PR #40611: URL: https://github.com/apache/spark/pull/40611#discussion_r1154304836 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala: ## @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client.arrow + +import java.io.{ByteArrayOutputStream, OutputStream} +import java.lang.invoke.{MethodHandles, MethodType} +import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInteger} +import java.nio.channels.Channels +import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.ByteString +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, DurationVector, FieldVector, Float4Vector, Float8Vector, IntervalYearVector, IntVector, NullVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, VarBinaryVector, VarCharVector, VectorSchemaRoot, VectorUnloader} +import org.apache.arrow.vector.complex.{ListVector, MapVector, StructVector} +import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel} +import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} +import org.apache.arrow.vector.util.Text + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.DefinedByConstructorParams +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.util.ArrowUtils + +/** + * Helper class for converting user objects into arrow batches. + */ +class ArrowSerializer[T]( +private[this] val enc: AgnosticEncoder[T], +private[this] val allocator: BufferAllocator, +private[this] val timeZoneId: String) { + private val (root, serializer) = ArrowSerializer.serializerFor(enc, allocator, timeZoneId) + private val vectors = root.getFieldVectors.asScala + private val unloader = new VectorUnloader(root) + private val schemaBytes = { +// Only serialize the schema once. +val bytes = new ByteArrayOutputStream() +MessageSerializer.serialize(newChannel(bytes), root.getSchema) +bytes.toByteArray + } + private var i: Int = 0 + + private def newChannel(output: OutputStream): WriteChannel = { +new WriteChannel(Channels.newChannel(output)) + } + + /** + * The size of the current batch. + * + * The size computed consist of the size of the schema and the size of the arrow buffers. The + * actual batch will be larger than that because of alignment, written IPC tokens, and the + * written record batch metadata. The size of the record batch metadata is proportional to the + * complexity of the schema. + */ + def sizeInBytes: Long = { +// We need to set the row count for getBufferSize to return the actual value. +root.setRowCount(i) +schemaBytes.length + vectors.map(_.getBufferSize).sum + } + + /** + * Append a record to the current batch. + */ + def append(record: T): Unit = { +serializer.write(i, record) +i += 1 + } + + /** + * Write the schema and the current batch in Arrow IPC stream format to the [[OutputStream]]. + */ + def writeIpcStream(output: OutputStream): Unit = { +val channel = newChannel(output) +root.setRowCount(i) +val batch = unloader.getRecordBatch +try { + channel.write(schemaBytes) + MessageSerializer.serialize(channel, batch) + ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT) +} finally { + batch.close() +} + } + + /** + * Reset the serializer. + */ + def reset(): Unit = { +i = 0 +vectors.foreach(_.reset()) + } + + /** + * Close the serializer. + */ + def close(): Unit = root.close() +} + +object ArrowSerializer { + import ArrowEncoderUtils._ + + /** + * Create an [[Iterator]] that
[GitHub] [spark] itholic opened a new pull request, #40624: [SPARK-42995][CONNECT][PYTHON] Migrate Spark Connect DataFrame errors into error class
itholic opened a new pull request, #40624: URL: https://github.com/apache/spark/pull/40624 ### What changes were proposed in this pull request? This PR proposes to migrate remaining `TypeError` from `python/pyspark/sql/connect/dataframe.py` into `PySparkTypeError`. ### Why are the changes needed? To migrate all user-facing errors into PySpark error framework. ### Does this PR introduce _any_ user-facing change? No, it's internal error framework improvement. ### How was this patch tested? The existing CI should pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
itholic commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154321016 ## dev/sparktestsupport/modules.py: ## @@ -693,6 +693,56 @@ def __hash__(self): "pyspark.pandas.tests.test_typedef", "pyspark.pandas.tests.test_utils", "pyspark.pandas.tests.test_window", +# unittests - Spark Connect parity tests Review Comment: That makes pretty much sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhmin commented on a diff in pull request #40567: [SPARK-42935] [SQL] Add union required distribution push down
zhmin commented on code in PR #40567: URL: https://github.com/apache/spark/pull/40567#discussion_r1154322170 ## sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/UnionDistributionPushdown.scala: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.exchange + +import java.io.{IOException, ObjectOutputStream} + +import scala.reflect.ClassTag + +import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnionExec, UnionZipExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * Pushes parent's required distribution to [[UnionExec]] so that we can + * remove unnecessary shuffle exchange if any child output partitioning matches + * 1) the operator is UnionExec + * 2) the operator's output partitioning can not satisfy parent distribution + * 3) the operator's parent requires hashing or clustering distribution + * 4) the data types of operator's children output attributes are the same + */ +case class UnionDistributionPushdown() extends Rule[SparkPlan] { + + def allChildrenSameDataType(unionExec: UnionExec): Boolean = { +(Seq(unionExec) ++ unionExec.children).map(_.output).transpose.map(attrs => { + val firstAttr = attrs.head + attrs.forall(_.dataType == firstAttr.dataType) +}).forall(_ == true) + } + + def createNewChildrenDistribution(unionExec: UnionExec, +unionRequired: Distribution): Seq[Distribution] = { +unionExec.children.map(unionChild => { + // create output attributes map (union output exprId -> child output) + val attributes = unionExec.output.map(_.exprId).zip(unionChild.output).toMap + unionRequired match { +case distribution: ClusteredDistribution => + val newExpressions = distribution.clustering.map(exp => exp.transformUp { +case attr: Attribute if attributes.contains(attr.exprId) => attributes(attr.exprId) + }) + ClusteredDistribution(newExpressions, distribution.requireAllClusterKeys, +distribution.requiredNumPartitions) + +case distribution => distribution + } +}) + } + + override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { +case operator: SparkPlan if conf.getConf(SQLConf.UNION_REQUIRED_DISTRIBUTION_PUSHDOWN) && + operator.children.exists(_.isInstanceOf[UnionExec]) => + val newChildren = operator.children.zipWithIndex.map { +case (unionExec: UnionExec, childIndex: Int) if !unionExec.outputPartitioning. + satisfies(operator.requiredChildDistribution(childIndex)) => + val unionRequired = operator.requiredChildDistribution(childIndex) + unionRequired match { +case _: ClusteredDistribution if allChildrenSameDataType(unionExec) => + val requiredChildDistribution: Seq[Distribution] = +createNewChildrenDistribution(unionExec, unionRequired) + val numPartitions = unionRequired.requiredNumPartitions +.getOrElse(conf.numShufflePartitions) + val outputPartitioning = unionRequired.createPartitioning(numPartitions) + UnionZipExec(unionExec.children, requiredChildDistribution, +outputPartitioning) + +case _: Distribution => unionExec + } + +case (child, _) => child + } + operator.withNewChildren(newChildren) + } +} + +/** + * Class representing partitions of UnionZipRDD, which maintains the list of + * corresponding partitions of parent RDDs. + */ +private[spark] +class UnionZipRDDPartition(@transient val rdds: Seq[RDD[_]], + override val index: Int) extends Partition { + var parents: Array[Partition] = rdds.map(_.partitions(index)).toArray + + override def hashCode(): Int = index + + override def equals(other: Any):
[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
itholic commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154322879 ## python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py: ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.data_type_ops.test_binary_ops import BinaryOpsTestsMixin +from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class BinaryOpsParityTests( +BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase +): +@unittest.skip("Fails in Spark Connect, should enable.") Review Comment: Good points. I'm making a sort of list for the failure for each tests. Let me do it in followups to avoid adding any further complexity to this PR. 😂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhmin commented on a diff in pull request #40567: [SPARK-42935] [SQL] Add union required distribution push down
zhmin commented on code in PR #40567: URL: https://github.com/apache/spark/pull/40567#discussion_r1154322505 ## sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/UnionDistributionPushdown.scala: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.exchange + +import java.io.{IOException, ObjectOutputStream} + +import scala.reflect.ClassTag + +import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{SparkPlan, UnionExec, UnionZipExec} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * Pushes parent's required distribution to [[UnionExec]] so that we can + * remove unnecessary shuffle exchange if any child output partitioning matches + * 1) the operator is UnionExec + * 2) the operator's output partitioning can not satisfy parent distribution + * 3) the operator's parent requires hashing or clustering distribution + * 4) the data types of operator's children output attributes are the same + */ +case class UnionDistributionPushdown() extends Rule[SparkPlan] { + + def allChildrenSameDataType(unionExec: UnionExec): Boolean = { +(Seq(unionExec) ++ unionExec.children).map(_.output).transpose.map(attrs => { + val firstAttr = attrs.head + attrs.forall(_.dataType == firstAttr.dataType) +}).forall(_ == true) + } + + def createNewChildrenDistribution(unionExec: UnionExec, +unionRequired: Distribution): Seq[Distribution] = { +unionExec.children.map(unionChild => { + // create output attributes map (union output exprId -> child output) + val attributes = unionExec.output.map(_.exprId).zip(unionChild.output).toMap + unionRequired match { +case distribution: ClusteredDistribution => + val newExpressions = distribution.clustering.map(exp => exp.transformUp { +case attr: Attribute if attributes.contains(attr.exprId) => attributes(attr.exprId) + }) + ClusteredDistribution(newExpressions, distribution.requireAllClusterKeys, +distribution.requiredNumPartitions) + +case distribution => distribution + } +}) + } + + override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { +case operator: SparkPlan if conf.getConf(SQLConf.UNION_REQUIRED_DISTRIBUTION_PUSHDOWN) && + operator.children.exists(_.isInstanceOf[UnionExec]) => + val newChildren = operator.children.zipWithIndex.map { +case (unionExec: UnionExec, childIndex: Int) if !unionExec.outputPartitioning. + satisfies(operator.requiredChildDistribution(childIndex)) => + val unionRequired = operator.requiredChildDistribution(childIndex) + unionRequired match { +case _: ClusteredDistribution if allChildrenSameDataType(unionExec) => + val requiredChildDistribution: Seq[Distribution] = +createNewChildrenDistribution(unionExec, unionRequired) + val numPartitions = unionRequired.requiredNumPartitions +.getOrElse(conf.numShufflePartitions) + val outputPartitioning = unionRequired.createPartitioning(numPartitions) + UnionZipExec(unionExec.children, requiredChildDistribution, +outputPartitioning) + +case _: Distribution => unionExec + } + +case (child, _) => child + } + operator.withNewChildren(newChildren) + } +} + +/** + * Class representing partitions of UnionZipRDD, which maintains the list of + * corresponding partitions of parent RDDs. + */ +private[spark] +class UnionZipRDDPartition(@transient val rdds: Seq[RDD[_]], + override val index: Int) extends Partition { + var parents: Array[Partition] = rdds.map(_.partitions(index)).toArray + + override def hashCode(): Int = index + + override def equals(other: Any):
[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
itholic commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154324225 ## python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py: ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.data_type_ops.test_binary_ops import BinaryOpsTestsMixin +from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class BinaryOpsParityTests( +BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase +): +@unittest.skip("Fails in Spark Connect, should enable.") Review Comment: Just created ticket just in case don't forget: SPARK-42996 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
itholic commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154324225 ## python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py: ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.data_type_ops.test_binary_ops import BinaryOpsTestsMixin +from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class BinaryOpsParityTests( +BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase +): +@unittest.skip("Fails in Spark Connect, should enable.") Review Comment: Just created ticket just in case to not forget: SPARK-42996 ## python/pyspark/pandas/tests/connect/data_type_ops/test_parity_binary_ops.py: ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.data_type_ops.test_binary_ops import BinaryOpsTestsMixin +from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class BinaryOpsParityTests( +BinaryOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase +): +@unittest.skip("Fails in Spark Connect, should enable.") Review Comment: Created ticket just in case to not forget: SPARK-42996 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #40525: [SPARK-42859][CONNECT][PS] Basic support for pandas API on Spark Connect
itholic commented on code in PR #40525: URL: https://github.com/apache/spark/pull/40525#discussion_r1154325557 ## dev/sparktestsupport/modules.py: ## @@ -693,6 +693,56 @@ def __hash__(self): "pyspark.pandas.tests.test_typedef", "pyspark.pandas.tests.test_utils", "pyspark.pandas.tests.test_window", +# unittests - Spark Connect parity tests Review Comment: Updated. Thanks! :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error
shrprasa commented on PR #40258: URL: https://github.com/apache/spark/pull/40258#issuecomment-1491795763 > according to the [code in 2.3](https://github.com/apache/spark/blob/branch-2.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L190), I think we should call `distinct` in line 345 @cloud-fan Yes, that should also work, but making it there will increase the impact of change to lot more other scenarios. Whereas the place where I have made distinct keeps the scope very limited. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] johanl-db commented on a diff in pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy
johanl-db commented on code in PR #40545: URL: https://github.com/apache/spark/pull/40545#discussion_r1154397094 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala: ## @@ -220,9 +220,20 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { attributeReference match { case attr @ FileSourceMetadataAttribute( Review Comment: We only cleanup the individual metadata fields but the _metadata attribute itself does have the right information to match on `FileSourceMetadataAttribute` : ``` def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = { // Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]] // avoids confusion by mapping back to [[metadataSchemaFields]]. val fields = metadataSchemaFields(fileFormat) .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation) FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields)) } ``` The following is true: ``` val metadata_attr = createFileMetadataCol(fileFormat: FileFormat) metadata_attr.metadata.getBoolean(METADATA_COL_ATTR_KEY) == true metadata_attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) == true metadata_attr.dataType.forall(_.metadata == Metadata.empty) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy
cloud-fan commented on PR #40545: URL: https://github.com/apache/spark/pull/40545#issuecomment-1491879620 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy
cloud-fan closed pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy URL: https://github.com/apache/spark/pull/40545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error
cloud-fan commented on PR #40258: URL: https://github.com/apache/spark/pull/40258#issuecomment-1491886464 If you really worry about regression, we can add a legacy config to fall back to the old code. I don't agree to make code changes that only fix the problem in one particular code path, while we know other code paths have the same problem as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonwang-db commented on a diff in pull request #40575: [SPARK-42945][CONNECT] Support PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect
allisonwang-db commented on code in PR #40575: URL: https://github.com/apache/spark/pull/40575#discussion_r1154486470 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -109,23 +118,41 @@ class SparkConnectService(debug: Boolean) */ private def handleError[V]( opType: String, - observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = { -case se: SparkException if isPythonExecutionException(se) => - logError(s"Error during: $opType", se) - observer.onError( - StatusProto.toStatusRuntimeException(buildStatusFromThrowable(se.getCause))) - -case e: Throwable if e.isInstanceOf[SparkThrowable] || NonFatal.apply(e) => - logError(s"Error during: $opType", e) - observer.onError(StatusProto.toStatusRuntimeException(buildStatusFromThrowable(e))) - -case e: Throwable => - logError(s"Error during: $opType", e) - observer.onError( -Status.UNKNOWN - .withCause(e) - .withDescription(StringUtils.abbreviate(e.getMessage, 2048)) - .asRuntimeException()) + observer: StreamObserver[V], + userId: String, + sessionId: String): PartialFunction[Throwable, Unit] = { +val session = + SparkConnectService +.getOrCreateIsolatedSession(userId, sessionId) +.session +val stackTraceEnabled = try { + session.conf.get( + org.apache.spark.sql.internal.SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED.key).toBoolean +} catch { + case NonFatal(_) => true +} + +{ Review Comment: Do you mean the try-catch block or the partial function block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40574: [SPARK-42942][SQL] Support coalesce table cache stage partitions
jaceklaskowski commented on code in PR #40574: URL: https://github.com/apache/spark/pull/40574#discussion_r1154480284 ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQERead.scala: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, UnaryExecNode} + +abstract class AQERead extends UnaryExecNode { + def child: SparkPlan + def partitionSpecs: Seq[ShufflePartitionSpec] + + assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at least one partition") + + override final def output: Seq[Attribute] = child.output + override final def supportsColumnar: Boolean = child.supportsColumnar + override final def supportsRowBased: Boolean = child.supportsRowBased + + def outputPartitionWithCoalesced(numPartitions: Int): Partitioning = { +// For coalesced shuffle read, the data distribution is not changed, only the number of +// partitions is changed. +child.outputPartitioning match { + case h: HashPartitioning => +CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = numPartitions)) + case r: RangePartitioning => +CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = numPartitions)) + // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses + // `RoundRobinPartitioning` but we don't need to retain the number of partitions. + case r: RoundRobinPartitioning => +r.copy(numPartitions = numPartitions) + case other@SinglePartition => Review Comment: nit: spaces around `@`? ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQERead.scala: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan, UnaryExecNode} + +abstract class AQERead extends UnaryExecNode { + def child: SparkPlan + def partitionSpecs: Seq[ShufflePartitionSpec] + + assert(partitionSpecs.nonEmpty, s"${getClass.getSimpleName} requires at least one partition") + + override final def output: Seq[Attribute] = child.output + override final def supportsColumnar: Boolean = child.supportsColumnar + override final def supportsRowBased: Boolean = child.supportsRowBased + + def outputPartitionWithCoalesced(numPartitions: Int): Partitioning = { +// For coalesced shuffle read, the data distribution is not changed, only the number of +// partitions is changed. +child.outputPartitioning match { + case h: HashPartitioning => +CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = numPartitions)) + case r: RangePartitioning => +CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = numPart
[GitHub] [spark] allisonwang-db commented on a diff in pull request #40575: [SPARK-42945][CONNECT] Support PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect
allisonwang-db commented on code in PR #40575: URL: https://github.com/apache/spark/pull/40575#discussion_r1154490873 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -73,18 +74,26 @@ class SparkConnectService(debug: Boolean) classes.toSeq } - private def buildStatusFromThrowable(st: Throwable): RPCStatus = { + private def buildStatusFromThrowable( + st: Throwable, + stackTraceEnabled: Boolean = false): RPCStatus = { +val errorInfo = ErrorInfo + .newBuilder() + .setReason(st.getClass.getName) + .setDomain("org.apache.spark") + .putMetadata("classes", compact(render(allClasses(st.getClass).map(_.getName + +val withStackTrace = if (stackTraceEnabled) { + val stackTrace = ExceptionUtils.getStackTrace(st) + errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace, 4096)) Review Comment: This is a good question. I wonder if we should make this configurable for users. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -73,18 +74,26 @@ class SparkConnectService(debug: Boolean) classes.toSeq } - private def buildStatusFromThrowable(st: Throwable): RPCStatus = { + private def buildStatusFromThrowable( + st: Throwable, + stackTraceEnabled: Boolean = false): RPCStatus = { +val errorInfo = ErrorInfo + .newBuilder() + .setReason(st.getClass.getName) + .setDomain("org.apache.spark") + .putMetadata("classes", compact(render(allClasses(st.getClass).map(_.getName + +val withStackTrace = if (stackTraceEnabled) { + val stackTrace = ExceptionUtils.getStackTrace(st) + errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace, 4096)) +} else { + errorInfo +} + RPCStatus .newBuilder() .setCode(RPCCode.INTERNAL_VALUE) - .addDetails( -ProtoAny.pack( - ErrorInfo -.newBuilder() -.setReason(st.getClass.getName) -.setDomain("org.apache.spark") -.putMetadata("classes", compact(render(allClasses(st.getClass).map(_.getName -.build())) + .addDetails(ProtoAny.pack(withStackTrace.build())) Review Comment: The stack trace is disabled by default, so it should not affect typical users. But I agree it can be hard to debug such issues. One solution could be to make the stack trace size configurable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error
shrprasa commented on PR #40258: URL: https://github.com/apache/spark/pull/40258#issuecomment-1491954910 > If you really worry about regression, we can add a legacy config to fall back to the old code. I don't agree to make code changes that only fix the problem in one particular code path, while we know other code paths have the same problem as well. Ok, I will update the PR with suggested change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40609: [SPARK-42316][SQL] Assign name to _LEGACY_ERROR_TEMP_2044
Hisoka-X commented on code in PR #40609: URL: https://github.com/apache/spark/pull/40609#discussion_r1154512853 ## sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala: ## @@ -625,6 +625,20 @@ class QueryExecutionErrorsSuite } } + test("BINARY_ARITHMETIC_CAUSE_OVERFLOW: byte plus byte result overflow") { +checkError( + exception = intercept[SparkArithmeticException] { +sql(s"select CAST('5' AS TINYINT) + CAST('5' AS TINYINT)").collect() + }, + errorClass = "BINARY_ARITHMETIC_CAUSE_OVERFLOW", + parameters = Map( +"value1" -> "5", +"symbol" -> "+", Review Comment: > Hi, I use `select CAST('32767' AS TINYINT) + CAST('32767' AS TINYINT)` but result null, is it acceptable? If not, I will fix it. https://user-images.githubusercontent.com/32387433/229014398-cbae18d7-a7bb-4de2-a44e-a44703ce0f99.png";> Forget it, I know why. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40607: [SPARK-42993][ML][CONNECT] Make PyTorch Distributor support Spark Connect
jaceklaskowski commented on code in PR #40607: URL: https://github.com/apache/spark/pull/40607#discussion_r1154524221 ## python/pyspark/ml/torch/distributor.py: ## @@ -581,12 +590,12 @@ def _run_distributed_training( f"Started distributed training with {self.num_processes} executor proceses" Review Comment: s/proceses/processes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer
dongjoon-hyun commented on PR #40589: URL: https://github.com/apache/spark/pull/40589#issuecomment-1492257092 cc @sunchao , too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #40596: [SPARK-42973][CONNECT][BUILD] Upgrade buf to v1.16.0
dongjoon-hyun closed pull request #40596: [SPARK-42973][CONNECT][BUILD] Upgrade buf to v1.16.0 URL: https://github.com/apache/spark/pull/40596 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1154700264 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: So the type of value in the state will be Long? SGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #40561: [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
rangadi commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1154705202 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: If we are saving long, I would suggesting changing the name to `expiresAtMs` or `expiresAtMicros` depending on it unit. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( +keyExpressions: Seq[Attribute], +child: SparkPlan, +stateInfo: Option[StatefulOperatorStateInfo] = None, +eventTimeWatermarkForLateEvents: Option[Long] = None, +eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( +Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: If we are saving long, I would suggest changing the name to `expiresAtMs` or `expiresAtMicros` depending on it unit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer
dongjoon-hyun commented on code in PR #40589: URL: https://github.com/apache/spark/pull/40589#discussion_r1154735386 ## sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala: ## @@ -166,6 +177,14 @@ class SparkSessionExtensions { runtimeOptimizerRules += builder } + /** + * Inject a rule that can override the query stage optimizer phase of adaptive query + * execution. + */ + def injectQueryStageOptimizerRule(builder: QueryStagePrepRuleBuilder): Unit = { Review Comment: Oh, this looks like a type of `QueryStageOptimizerRuleBuilder`. We need a builder of type `QueryStageOptimizerRuleBuilder` instead of `QueryStagePrepRuleBuilder` type, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40589: [SPARK-38697][SQL] Extend SparkSessionExtensions to inject rules into AQE query stage optimizer
dongjoon-hyun commented on code in PR #40589: URL: https://github.com/apache/spark/pull/40589#discussion_r1154736185 ## sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala: ## @@ -1161,3 +1177,16 @@ object AddLimit extends Rule[LogicalPlan] { case _ => Limit(Literal(1), plan) } } + +object RequireAtLeaseTwoPartitions extends Rule[SparkPlan] { + override def apply(plan: SparkPlan): SparkPlan = { +val readOpt = plan.find(_.isInstanceOf[AQEShuffleReadExec]) +if (readOpt.exists(_.outputPartitioning.numPartitions == 1)) { + plan.transform { +case read: AQEShuffleReadExec => read.child + } +} else { + plan Review Comment: indentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] arunsimhateachmint opened a new pull request, #40625: [TDE-88] Access control on spark-3.3.2
arunsimhateachmint opened a new pull request, #40625: URL: https://github.com/apache/spark/pull/40625 ### What changes were proposed in this pull request? Implement access control on a database via row level security on metadata ### Why are the changes needed? Implemneting this would bring access control on spark-standalone ### Does this PR introduce _any_ user-facing change? using doAs for thrift-server / --proxy-user for shell would restrict access ### How was this patch tested? Created 2 users ( with row level restrictions on metadata db ) - To test --proxy-user : Started a shell with --proxy-user flag and ran show tables / count rows on table (with access / without access) - To test doAs : Started a thrift server. Connected as test user via beeline and ran show tables / count rows on table (with access / without access) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] arunsimhateachmint closed pull request #40625: [TDE-88] Access control on spark-3.3.2
arunsimhateachmint closed pull request #40625: [TDE-88] Access control on spark-3.3.2 URL: https://github.com/apache/spark/pull/40625 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ritikam2 commented on pull request #18994: [SPARK-21784][SQL] Adds support for defining informational primary key and foreign key constraints using ALTER TABLE DDL.
ritikam2 commented on PR #18994: URL: https://github.com/apache/spark/pull/18994#issuecomment-1492432645 Hi Suresh I am trying to look into your changes. I am not able to find the following In SparkSqlParser cannot find AddTableConstraintContext ``` override def visitAddTableConstraint( ctx: AddTableConstraintContext): LogicalPlan = withOrigin(ctx) { val tableIdentifier = visitTableIdentifier(ctx.tableIdentifier) ``` In SparkSqlParser cannot find TableConstraintContext ``` `override` def visitTableConstraint( ctx: TableConstraintContext): TableConstraint = withOrigin(ctx) { val keyColNames = visitIdentifierList(ctx.keyColNames).toArray ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] clownxc opened a new pull request, #40626: [SPARK-42860][SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode
clownxc opened a new pull request, #40626: URL: https://github.com/apache/spark/pull/40626 ### What changes were proposed in this pull request? This PR proposes to provide a explain mode that will only print the parsed and analysed logical plans only ### Why are the changes needed? validate the sql query before submitting the job . We are currently using df.explain(extended=true) which generates parsed , analysed , optimised logical plan and physical plan . But generating optimised logical plan sometimes takes more time. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Pass GitHub Actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bjornjorgensen commented on pull request #40622: fix typo in ResourceRequest.equals()
bjornjorgensen commented on PR #40622: URL: https://github.com/apache/spark/pull/40622#issuecomment-1492468583 @thyecust "vendor == vendor is always true" In line 84 the val vendor field is declared as Optional[String], which means it can have a value of Some(String) when a vendor name is provided, or None when no vendor name is specified. The failed test is not related to this, can you restart failed test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch
dtenedor commented on code in PR #40615: URL: https://github.com/apache/spark/pull/40615#discussion_r1154843262 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions which utilize HllSketch instances should never return null */ + override def nullable: Boolean = false + + /** + * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params. + * + * @return an HllSketch instance + */ + override def createAggregationBuffer(): HllSketch = { +// scalastyle:off caselocale +new HllSketch(lgConfigK, TgtHllType.valueOf(tgtHllType.toUpperCase)) +// scalastyle:on caselocale + } + + /** + * Evaluate the input row and update the HllSketch instance with the row's value. + * The update function only supports a subset of Spark SQL types, and an + * UnsupportedOperationException will be thrown for unsupported types. + * + * @param sketch The HllSketch instance. + * @param input an input row + */ + override def update(sketch: HllSketch, input: InternalRow): HllSketch = { +val v = child.eval(input) +if (v != null) { + child.dataType match { +// Update implemented for all types supported by HllSketch +// Spark SQL doesn't have equivalent types for ByteBuffer or char[] so leave those out +case IntegerType => sketch.update(v.asInstanceOf[Int]) +case LongType => sketch.update(v.asInstanceOf[Long]) +case DoubleType => sketch.update(v.asInstanceOf[Double]) Review Comment: I don't think it is a good idea to support IEEE floating-point input types for these functions. There is imprecision when comparing them to each other which can lead to instability in the results. By the same token, performing a GROUP BY operation by floating-point values is not a good idea. For reference, see these function definitions which do not include floating-point input types [1]. On the other hand, I don't see any reason why we can't support DecimalType, Datetime or Interval types (we can leave a TODO comment to support those in another PR if we want). [1] https://github.com/google/zetasql/blob/master/docs/hll_functions.md ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LI
[GitHub] [spark] sunchao commented on a diff in pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader
sunchao commented on code in PR #39950: URL: https://github.com/apache/spark/pull/39950#discussion_r1154849296 ## sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java: ## @@ -89,17 +90,28 @@ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { +initialize(inputSplit, taskAttemptContext, Option.empty()); + } + + public void initialize( + InputSplit inputSplit, + TaskAttemptContext taskAttemptContext, + Option fileFooter) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); FileSplit split = (FileSplit) inputSplit; this.file = split.getPath(); - -ParquetReadOptions options = HadoopReadOptions - .builder(configuration, file) - .withRange(split.getStart(), split.getStart() + split.getLength()) - .withCodecFactory(new ParquetCodecFactory(configuration, 0)) - .build(); -ParquetFileReader fileReader = new ParquetFileReader( -HadoopInputFile.fromPath(file, configuration), options); +ParquetFileReader fileReader; +if (fileFooter.isDefined()) { + fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); Review Comment: yes.. this will be an issue unless Spark is upgraded to Parquet 1.12.4 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala: ## @@ -205,11 +205,22 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value - lazy val footerFileMetaData = -ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + val fileFooter = if (enableVectorizedReader) { +// This can avoid reading the footer twice(currently only optimize for vectorized read). Review Comment: not addressed yet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch
RyanBerti commented on code in PR #40615: URL: https://github.com/apache/spark/pull/40615#discussion_r1154926480 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions which utilize HllSketch instances should never return null */ + override def nullable: Boolean = false + + /** + * Instantiate an HllSketch instance using the lgConfigK and tgtHllType params. + * + * @return an HllSketch instance + */ + override def createAggregationBuffer(): HllSketch = { +// scalastyle:off caselocale Review Comment: I initially had that in-place, but removed it as I wasn't sure the enum.valueOf call would handle any non-english locales. I can update with Locale.Root though, just was unfamiliar with that implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch
RyanBerti commented on code in PR #40615: URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions which utilize HllSketch instances should never return null */ Review Comment: Just ran a quick test, and both null and empty return 0: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch
RyanBerti commented on code in PR #40615: URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions which utilize HllSketch instances should never return null */ Review Comment: Just ran a quick test, and both null and empty return 0: ` val sketch = new HllSketch() assert(sketch.getEstimate == 0) val nullString: String = null sketch.update(nullString) assert(sketch.getEstimate == 0)` ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions
[GitHub] [spark] RyanBerti commented on a diff in pull request #40615: [WIP][SPARK-16484][SQL] Add support for Datasketches HllSketch
RyanBerti commented on code in PR #40615: URL: https://github.com/apache/spark/pull/40615#discussion_r1154928718 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union} +import org.apache.datasketches.memory.WritableMemory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionDescription} +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String + + +/** + * This datasketchesAggregates file is intended to encapsulate all of the + * aggregate functions that utilize Datasketches sketch objects as intermediate + * aggregation buffers. + * + * The HllSketchAggregate sealed trait is meant to be extended by the aggregate + * functions which utilize instances of HllSketch to count uniques. + */ +sealed trait HllSketchAggregate + extends TypedImperativeAggregate[HllSketch] with UnaryLike[Expression] { + + // These are used as params when instantiating the HllSketch object + // and are set by the case classes' args that extend this trait + + def lgConfigK: Int + def tgtHllType: String + + // From here on, these are the shared default implementations for TypedImperativeAggregate + + /** Aggregate functions which utilize HllSketch instances should never return null */ Review Comment: Just ran a quick test, and both null and empty return 0: ``` val sketch = new HllSketch() assert(sketch.getEstimate == 0) val nullString: String = null sketch.update(nullString) assert(sketch.getEstimate == 0) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on pull request #40586: [SPARK-42939][SS][CONNECT] Core streaming Python API for Spark Connect
WweiL commented on PR #40586: URL: https://github.com/apache/spark/pull/40586#issuecomment-1492687197 I have less expertises in protobuf, otherwise LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin opened a new pull request, #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct
ueshin opened a new pull request, #40627: URL: https://github.com/apache/spark/pull/40627 ### What changes were proposed in this pull request? Fix `DataFrame.collect` with null struct. ### Why are the changes needed? There is a behavior difference when collecting `null` struct: In Spark Connect: ```py >>> df = spark.sql("values (1, struct('a' as x)), (2, struct(null as x)), (null, null) as t(a, b)") >>> df.printSchema() root |-- a: integer (nullable = true) |-- b: struct (nullable = true) ||-- x: string (nullable = true) >>> df.show() ++--+ | a| b| ++--+ | 1| {a}| | 2|{null}| |null| null| ++--+ >>> df.collect() [Row(a=1, b=Row(x='a')), Row(a=2, b=Row(x=None)), Row(a=None, b=)] ``` whereas PySpark: ```py >>> df.collect() [Row(a=1, b=Row(x='a')), Row(a=2, b=Row(x=None)), Row(a=None, b=None)] ``` ### Does this PR introduce _any_ user-facing change? The behavior fix. ### How was this patch tested? Added/modified the related tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo opened a new pull request, #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition
zhenlineo opened a new pull request, #40628: URL: https://github.com/apache/spark/pull/40628 ### What changes were proposed in this pull request? Implements missing methods in Dataset: foreach and foreachPartition. PR based on top of https://github.com/apache/spark/pull/40581 The impl of foreachPartition is based on top of mapPartitions + count. ### Why are the changes needed? Add missing methods in Dataset. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #39136: Create stable names for dynamically generated classes
github-actions[bot] commented on PR #39136: URL: https://github.com/apache/spark/pull/39136#issuecomment-1492754069 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #39114: [SPARK-40708][SQL][WIP] Auto update partition statistics based on write metrics
github-actions[bot] commented on PR #39114: URL: https://github.com/apache/spark/pull/39114#issuecomment-1492754086 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #39130: [SPARK-xxxxx][DOCUMENTATION][PYTHON] Fix grammar in docstring for toDF().
github-actions[bot] closed pull request #39130: [SPARK-x][DOCUMENTATION][PYTHON] Fix grammar in docstring for toDF(). URL: https://github.com/apache/spark/pull/39130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
zhenlineo commented on PR #40564: URL: https://github.com/apache/spark/pull/40564#issuecomment-1492754667 cc @LuciferYang Can we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] liuzqt opened a new pull request, #40629: [SPARK-42980][CORE] Implement a lightweight SmallBroadcast
liuzqt opened a new pull request, #40629: URL: https://github.com/apache/spark/pull/40629 ### What changes were proposed in this pull request? The current TorrentBroadcast implementation is originally designed for large data transmission where driver might become the bottleneck and memory might also be the concern. Therefore it's kind of heavy and comes with some fixed overhead: - torrent protocol: more round traffic - disk level persistency, BlockManager, extra overhead which makes it inefficient for some small data transmission. We can have a lightweight broadcast implementation, e.g, SmallBroadcast, which implement star-topology broadcast(which avoids the unnecessary round traffic), and maybe in-memory only. Note: The current factory style API seem to encourage only one broadcast style on runtime, however this small broadcast implementation is a supplement to `TorrentBroadcast`. So I introduce new small broadcast APIs along the code path. ### Why are the changes needed? Provide a lightweight implementation of `Broadcast`, which is suitable for situation where data size is small and latency is critical. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Unit test. More testcases are from `BroadcastSuite` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi opened a new pull request, #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi opened a new pull request, #40630: URL: https://github.com/apache/spark/pull/40630 ### What changes were proposed in this pull request? This PR fixes `TableOutputResolver` to use correct column paths in error messages for arrays and maps. ### Why are the changes needed? These changes are needed to have accurate error messages when there is a type mismatch inside arrays and maps. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155030408 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -203,16 +204,17 @@ object TableOutputResolver { addError(s"Cannot write nullable values to map of non-nulls: '${colPath.quoted}'") None } else { - val keyParam = NamedLambdaVariable("k", inputType.keyType, nullable = false) - val fakeKeyAttr = AttributeReference("k", expectedType.keyType, nullable = false)() + val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) + val fakeKeyAttr = AttributeReference("key", expectedType.keyType, nullable = false)() val resKey = reorderColumnsByName( -Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath :+ "key") Review Comment: Error messages included `map_col.k.key.struct_col`. They include `map_col.key.struct_col` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on PR #40630: URL: https://github.com/apache/spark/pull/40630#issuecomment-1492767210 cc @cloud-fan @dongjoon-hyun @viirya @huaxingao @sunchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] clownxc closed pull request #40626: [SPARK-42860][SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode
clownxc closed pull request #40626: [SPARK-42860][SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode URL: https://github.com/apache/spark/pull/40626 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on PR #40564: URL: https://github.com/apache/spark/pull/40564#issuecomment-1492772726 The pr title should be `[SPARK-42519][CONNECT][TESTS] ...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on PR #40564: URL: https://github.com/apache/spark/pull/40564#issuecomment-1492773043 @zhenlineo Thanks for ping me ~ Let me do some manual checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155033254 ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala: ## @@ -434,7 +433,9 @@ abstract class InMemoryBaseTable( protected var streamingWriter: StreamingWrite = StreamingAppend override def overwriteDynamicPartitions(): WriteBuilder = { - assert(writer == Append) + if (writer != Append) { Review Comment: What is the reason for changing the exception type? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] clownxc opened a new pull request, #40631: [SPARK-42860] [SQL] Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode
clownxc opened a new pull request, #40631: URL: https://github.com/apache/spark/pull/40631 ### What changes were proposed in this pull request? This PR proposes to provide a 'ValidationMode' in org.apache.spark.sql.execution.ExplainMode that will only print the parsed and analysed logical plans only. ### Why are the changes needed? Validate the sql query before submitting the job . We are currently using df.explain(extended=true) which generates parsed , analysed , optimised logical plan and physical plan . But generating optimised logical plan sometimes takes more time. ### Does this PR introduce *any* user-facing change? Yes ### How was this patch tested? Pass GitHub Actions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155033747 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala: ## @@ -58,10 +58,15 @@ object SparkConnectServerUtils { private lazy val sparkConnect: Process = { debug("Starting the Spark Connect Server...") -val jar = findJar( +val connectJar = findJar( "connector/connect/server", "spark-connect-assembly", "spark-connect").getCanonicalPath +var driverClassPath = connectJar Review Comment: I think we can make `driverClassPath` as `val` like: ``` val driverClassPath = f (IntegrationTestUtils.isCatalystTestJarAvailable) { connectJar += ":" ... } else connectJar ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155034222 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) +withTable("testcat.myTableV2") { + spark.conf.set( Review Comment: can we use `withSQLConf` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct
zhengruifeng closed pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct URL: https://github.com/apache/spark/pull/40627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40627: [SPARK-42998][CONNECT][PYTHON] Fix DataFrame.collect with null struct
zhengruifeng commented on PR #40627: URL: https://github.com/apache/spark/pull/40627#issuecomment-1492778516 Thanks, merged into master/branch-3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155038497 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) Review Comment: local run `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite" -Phive`, the new test always be `IGNORED`. Do you mind refactor the following code to make local sbt test can test them as default? https://github.com/apache/spark/blob/74cddcfda3ac4779de80696cdae2ba64d53fc635/project/SparkBuild.scala#L855-L857 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40607: [SPARK-42993][ML][CONNECT] Make PyTorch Distributor support Spark Connect
zhengruifeng commented on code in PR #40607: URL: https://github.com/apache/spark/pull/40607#discussion_r1155039367 ## python/pyspark/ml/torch/distributor.py: ## @@ -581,12 +590,12 @@ def _run_distributed_training( f"Started distributed training with {self.num_processes} executor proceses" Review Comment: thanks, will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155039507 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) +withTable("testcat.myTableV2") { + spark.conf.set( Review Comment: Or should we make this the default config for `SimpleSparkConnectService` startup, then we can skip this manual configuration @zhenlineo WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155039507 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) +withTable("testcat.myTableV2") { + spark.conf.set( Review Comment: @Hisoka-X Or should we make this the default config for `SimpleSparkConnectService` startup, then we can skip this manual configuration @zhenlineo WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #40564: [SPARK-42519] [Test] [Connect] Add More WriteTo Tests In Spark Connect Client
LuciferYang commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155039927 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) Review Comment: For example , add `(LocalProject("catalyst") / Test / Keys.`package`).value` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
dongjoon-hyun commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155040511 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: May I ask why we need to change variable name from `x` to `element` for this bug? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: May I ask why we need to change variable name from `x` to `element` for this bug fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet to 1.12.4
dongjoon-hyun commented on PR #40555: URL: https://github.com/apache/spark/pull/40555#issuecomment-1492793200 Shall we close this PR because Apache Parquet 1.12.4 vote seems to fail due to the technical issue. For the record, the release manage creates it from the wrong branch. - https://github.com/apache/parquet-mr/commit/22069e58494e7cb5d50e664c7ffa1cf1468404f8 ``` - 1.13.0-SNAPSHOT + 1.12.4 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155044709 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: The variable name is included in the error message because it will be part of the column path. Without this rename, the error message will refer `array_col.x`, where `x` is not known to the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155044709 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: The variable name is included in the error message because it will be part of the column path. Without this rename, the error message will reference `array_col.x`, where `x` is not known to the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155045026 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: It is similar to what we already do in `DataType$canWrite()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X opened a new pull request, #40632: [SPARK-42298][SQL] Assign name to _LEGACY_ERROR_TEMP_2132
Hisoka-X opened a new pull request, #40632: URL: https://github.com/apache/spark/pull/40632 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2132, "CANNOT_PARSE_JSON_ARRAYS_AS_STRUCTS". ### Why are the changes needed? Assign proper name to LEGACY_ERROR_TEMP ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ./build/sbt "testOnly org.apache.spark.sql.errors.QueryExecutionErrorsSuite" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40632: [SPARK-42298][SQL] Assign name to _LEGACY_ERROR_TEMP_2132
Hisoka-X commented on code in PR #40632: URL: https://github.com/apache/spark/pull/40632#discussion_r1155046085 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ## @@ -1404,8 +1404,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = { new SparkRuntimeException( - errorClass = "_LEGACY_ERROR_TEMP_2132", - messageParameters = Map.empty) + errorClass = "CANNOT_PARSE_JSON_ARRAYS_AS_STRUCTS", Review Comment: This error is never thrown to the user as it is intercepted by `MALFORMED_RECORD_IN_PARSING`. So I didn't add test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40563: [SPARK-41232][SPARK-41233][FOLLOWUP] Refactor `array_append` and `array_prepend` with `RuntimeReplaceable`
zhengruifeng commented on PR #40563: URL: https://github.com/apache/spark/pull/40563#issuecomment-1492810135 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client
Hisoka-X commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155047034 ## sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala: ## @@ -434,7 +433,9 @@ abstract class InMemoryBaseTable( protected var streamingWriter: StreamingWrite = StreamingAppend override def overwriteDynamicPartitions(): WriteBuilder = { - assert(writer == Append) + if (writer != Append) { Review Comment: The reason is we want to use catalyst test jar to submit ConnectService, so we can use `InMemoryTableCatalog`. But if `scalatest` used by `InMemoryTableCatalog`. We should add `scalatest` jar into ConnectService too. So I remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa commented on pull request #40258: [SPARK-42655][SQL] Incorrect ambiguous column reference error
shrprasa commented on PR #40258: URL: https://github.com/apache/spark/pull/40258#issuecomment-1492817996 @cloud-fan I have made the change. All Tests have passed. Can you please review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa commented on pull request #40128: [SPARK-42466][K8S]: Cleanup k8s upload directory when job terminates
shrprasa commented on PR #40128: URL: https://github.com/apache/spark/pull/40128#issuecomment-1492818493 @holdenk @dongjoon-hyun TTL based clearnce has limitations as pointed out in this comment https://github.com/apache/spark/pull/40363#issuecomment-1467439787 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client
Hisoka-X commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155054750 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala: ## @@ -58,10 +58,15 @@ object SparkConnectServerUtils { private lazy val sparkConnect: Process = { debug("Starting the Spark Connect Server...") -val jar = findJar( +val connectJar = findJar( "connector/connect/server", "spark-connect-assembly", "spark-connect").getCanonicalPath +var driverClassPath = connectJar Review Comment: Thanks for advise. Finished -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client
Hisoka-X commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155054855 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) +withTable("testcat.myTableV2") { + spark.conf.set( Review Comment: I removed it and change `RemoteSparkSession` to set config `spark.sql.catalog.testcat` as default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40564: [SPARK-42519][CONNECT][TESTS] Add More WriteTo Tests In Spark Connect Client
Hisoka-X commented on code in PR #40564: URL: https://github.com/apache/spark/pull/40564#discussion_r1155054885 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala: ## @@ -220,41 +220,132 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper { } } + test("writeTo with create") { +assume(IntegrationTestUtils.isCatalystTestJarAvailable) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum closed pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet to 1.12.4
wangyum closed pull request #40555: [SPARK-42926][BUILD][SQL] Upgrade Parquet to 1.12.4 URL: https://github.com/apache/spark/pull/40555 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #40630: [SPARK-42997][SQL] TableOutputResolver must use correct column paths in error messages for arrays and maps
aokolnychyi commented on code in PR #40630: URL: https://github.com/apache/spark/pull/40630#discussion_r1155045026 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala: ## @@ -179,8 +179,9 @@ object TableOutputResolver { addError(s"Cannot write nullable elements to array of non-nulls: '${colPath.quoted}'") None } else { - val param = NamedLambdaVariable("x", inputType.elementType, inputType.containsNull) - val fakeAttr = AttributeReference("x", expectedType.elementType, expectedType.containsNull)() + val param = NamedLambdaVariable("element", inputType.elementType, inputType.containsNull) Review Comment: The new logic is similar to what we already do in `DataType$canWrite()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org