[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),
[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),
[spark] branch master updated (d273a2b -> d0f9614)
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d273a2b [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support add d0f9614 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` No new revisions were added by this update. Summary of changes: .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),
[spark] branch master updated (d273a2b -> d0f9614)
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d273a2b [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support add d0f9614 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` No new revisions were added by this update. Summary of changes: .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support
This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d273a2b [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support d273a2b is described below commit d273a2bb0fac452a97f5670edd69d3e452e3e57e Author: Holden Karau AuthorDate: Fri Feb 14 12:36:52 2020 -0800 [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support This PR is based on an existing/previou PR - https://github.com/apache/spark/pull/19045 ### What changes were proposed in this pull request? This changes adds a decommissioning state that we can enter when the cloud provider/scheduler lets us know we aren't going to be removed immediately but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / preemptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs, in the future we could perform some kind of migration of data during scale-down, or at least stop accepting new [...] There is a design document at https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing ### Why are the changes needed? With more move to preemptible multi-tenancy, serverless environments, and spot-instances better handling of node scale down is required. ### Does this PR introduce any user-facing change? There is no API change, however an additional configuration flag is added to enable/disable this behaviour. ### How was this patch tested? New integration tests in the Spark K8s integration testing. Extension of the AppClientSuite to test decommissioning seperate from the K8s. Closes #26440 from holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4. Lead-authored-by: Holden Karau Co-authored-by: Holden Karau Signed-off-by: Holden Karau --- .../org/apache/spark/deploy/DeployMessage.scala| 11 ++ .../org/apache/spark/deploy/ExecutorState.scala| 8 +- .../spark/deploy/client/StandaloneAppClient.scala | 2 + .../client/StandaloneAppClientListener.scala | 2 + .../org/apache/spark/deploy/master/Master.scala| 31 ++ .../org/apache/spark/deploy/worker/Worker.scala| 26 + .../executor/CoarseGrainedExecutorBackend.scala| 39 ++- .../scala/org/apache/spark/executor/Executor.scala | 16 +++ .../org/apache/spark/internal/config/Worker.scala | 5 + core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 + .../spark/scheduler/ExecutorLossReason.scala | 8 ++ .../scala/org/apache/spark/scheduler/Pool.scala| 4 + .../org/apache/spark/scheduler/Schedulable.scala | 1 + .../apache/spark/scheduler/SchedulerBackend.scala | 3 + .../org/apache/spark/scheduler/TaskScheduler.scala | 5 + .../apache/spark/scheduler/TaskSchedulerImpl.scala | 5 + .../apache/spark/scheduler/TaskSetManager.scala| 6 ++ .../cluster/CoarseGrainedClusterMessage.scala | 2 + .../cluster/CoarseGrainedSchedulerBackend.scala| 66 +++- .../cluster/StandaloneSchedulerBackend.scala | 6 ++ .../scala/org/apache/spark/util/SignalUtils.scala | 2 +- .../spark/deploy/client/AppClientSuite.scala | 39 ++- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 + .../scheduler/ExternalClusterManagerSuite.scala| 1 + .../spark/scheduler/WorkerDecommissionSuite.scala | 84 +++ .../apache/spark/deploy/k8s/KubernetesConf.scala | 3 + .../k8s/features/BasicExecutorFeatureStep.scala| 20 +++- .../docker/src/main/dockerfiles/spark/Dockerfile | 4 +- .../docker/src/main/dockerfiles/spark/decom.sh | 35 ++ .../src/main/dockerfiles/spark/entrypoint.sh | 6 +- .../dev/dev-run-integration-tests.sh | 9 +- .../k8s/integrationtest/DecommissionSuite.scala| 49 + .../k8s/integrationtest/KubernetesSuite.scala | 117 - .../integration-tests/tests/decommissioning.py | 45 sbin/decommission-slave.sh | 57 ++ sbin/spark-daemon.sh | 15 +++ 36 files changed, 690 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index fba371d..18305ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -60,6 +60,15 @@ private[deploy] object DeployMessages { assert (port > 0) } + /** + * @param id the worker id + * @param worker the worker endpoint ref + */ + case class
[spark] branch branch-3.0 updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1385fc0 [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver 1385fc0 is described below commit 1385fc02ce7d28e6570971e1687e74d245a5533f Author: HyukjinKwon AuthorDate: Fri Feb 14 10:18:08 2020 -0800 [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver ### What changes were proposed in this pull request? This PR address the comment at https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves the migration guide to explicitly note that the legacy environment variable to set in both executor and driver. ### Why are the changes needed? To clarify this env should be set both in driver and executors. ### Does this PR introduce any user-facing change? Nope. ### How was this patch tested? I checked it via md editor. Closes #27573 from HyukjinKwon/SPARK-29748. Authored-by: HyukjinKwon Signed-off-by: Shixiong Zhu (cherry picked from commit b343757b1bd5d0344b82f36aa4d65ed34f840606) Signed-off-by: Shixiong Zhu --- docs/pyspark-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md index 8ea4fec..f7f2038 100644 --- a/docs/pyspark-migration-guide.md +++ b/docs/pyspark-migration-guide.md @@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide. - Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used. For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`. - - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 3.6, the field names will be sorted alphabetically as the only option. + - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - this environment variable must be consistent on all executors and driver; otherwise, it may cause failures or incorrect answers. For [...] ## Upgrading from PySpark 2.3 to 2.4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b343757 [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver b343757 is described below commit b343757b1bd5d0344b82f36aa4d65ed34f840606 Author: HyukjinKwon AuthorDate: Fri Feb 14 10:18:08 2020 -0800 [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver ### What changes were proposed in this pull request? This PR address the comment at https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves the migration guide to explicitly note that the legacy environment variable to set in both executor and driver. ### Why are the changes needed? To clarify this env should be set both in driver and executors. ### Does this PR introduce any user-facing change? Nope. ### How was this patch tested? I checked it via md editor. Closes #27573 from HyukjinKwon/SPARK-29748. Authored-by: HyukjinKwon Signed-off-by: Shixiong Zhu --- docs/pyspark-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md index 8ea4fec..f7f2038 100644 --- a/docs/pyspark-migration-guide.md +++ b/docs/pyspark-migration-guide.md @@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide. - Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used. For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`. - - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 3.6, the field names will be sorted alphabetically as the only option. + - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - this environment variable must be consistent on all executors and driver; otherwise, it may cause failures or incorrect answers. For [...] ## Upgrading from PySpark 2.3 to 2.4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0a8d7a3 [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels 0a8d7a3 is described below commit 0a8d7a35e24acbd7af57fe5169691afb8ccd8675 Author: Maxim Gekk AuthorDate: Fri Feb 14 22:16:57 2020 +0800 [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels ### What changes were proposed in this pull request? In the PR, I propose to use Java 8 time API in timestamp truncations to the levels of `HOUR` and `DAY`. The problem is in the usage of `timeZone.getOffset(millis)` in days/hours truncations where the combined calendar (Julian + Gregorian) is used underneath. ### Why are the changes needed? The change fix wrong truncations. For example, the following truncation to hours should print `0010-01-01 01:00:00` but it outputs wrong timestamp: ```scala Seq("0010-01-01 01:02:03.123456").toDF() .select($"value".cast("timestamp").as("ts")) .select(date_trunc("HOUR", $"ts").cast("string")) .show(false) ++ |CAST(date_trunc(HOUR, ts) AS STRING)| ++ |0010-01-01 01:30:17 | ++ ``` ### Does this PR introduce any user-facing change? Yes. After the changes, the result of the example above is: ```scala ++ |CAST(date_trunc(HOUR, ts) AS STRING)| ++ |0010-01-01 01:00:00 | ++ ``` ### How was this patch tested? - Added new test to `DateFunctionsSuite` - By `DateExpressionsSuite` and `DateTimeUtilsSuite` Closes #27512 from MaxGekk/fix-trunc-old-timestamp. Authored-by: Maxim Gekk Signed-off-by: Wenchen Fan (cherry picked from commit 7137a6d065edeaab97bf5bf49ffaca3d060a14fe) Signed-off-by: Wenchen Fan --- .../catalyst/expressions/datetimeExpressions.scala | 6 +-- .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++--- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +-- .../org/apache/spark/sql/DateFunctionsSuite.scala | 13 +++ 4 files changed, 59 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index cf91489..adf7251 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -1690,15 +1690,15 @@ case class TruncTimestamp( override def eval(input: InternalRow): Any = { evalHelper(input, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC) { (t: Any, level: Int) => - DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, timeZone) + DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, zoneId) } } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val tz = ctx.addReferenceObj("timeZone", timeZone) +val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) codeGenHelper(ctx, ev, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC, true) { (date: String, fmt: String) => -s"truncTimestamp($date, $fmt, $tz);" +s"truncTimestamp($date, $fmt, $zid);" } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 01d36f1..ce0c138 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -711,32 +711,34 @@ object DateTimeUtils { } } + private def truncToUnit(t: SQLTimestamp, zoneId: ZoneId, unit: ChronoUnit): SQLTimestamp = { +val truncated = microsToInstant(t).atZone(zoneId).truncatedTo(unit) +instantToMicros(truncated.toInstant) + } + /** * Returns the trunc date time from original date time and trunc level. * Trunc level should be generated using `parseTruncLevel()`, should be between 0 and 12. */ - def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): SQLTimestamp = { -if (level == TRUNC_TO_MICROSECOND) return t -var millis = MICROSECONDS.toMillis(t) -val truncated = level match { - case TRUNC_TO_MILLISECOND => millis - case TRUNC_TO_SECOND => -millis - millis %
[spark] branch master updated (2a270a7 -> 7137a6d)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2a270a7 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API add 7137a6d [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels No new revisions were added by this update. Summary of changes: .../catalyst/expressions/datetimeExpressions.scala | 6 +-- .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++--- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +-- .../org/apache/spark/sql/DateFunctionsSuite.scala | 13 +++ 4 files changed, 59 insertions(+), 43 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2a270a7 -> 7137a6d)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2a270a7 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API add 7137a6d [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels No new revisions were added by this update. Summary of changes: .../catalyst/expressions/datetimeExpressions.scala | 6 +-- .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++--- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +-- .../org/apache/spark/sql/DateFunctionsSuite.scala | 13 +++ 4 files changed, 59 insertions(+), 43 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 79ce792 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API 79ce792 is described below commit 79ce79234f02092e22fdd79e859d83f5a174ef95 Author: HyukjinKwon AuthorDate: Fri Feb 14 18:20:18 2020 +0800 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API ### What changes were proposed in this pull request? This PR fixes `DataFrameReader.csv(dataset: Dataset[String])` API to take a `Dataset[String]` originated from a column name different from `value`. This is a long-standing bug started from the very first place. `CSVUtils.filterCommentAndEmpty` assumed the `Dataset[String]` to be originated with `value` column. This PR changes to use the first column name in the schema. ### Why are the changes needed? For `DataFrameReader.csv(dataset: Dataset[String])` to support any `Dataset[String]` as the signature indicates. ### Does this PR introduce any user-facing change? Yes, ```scala val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String] spark.read.option("header", true).option("inferSchema", true).csv(ds).show() ``` Before: ``` org.apache.spark.sql.AnalysisException: cannot resolve '`value`' given input columns: [text];; 'Filter (length(trim('value, None)) > 0) +- Project [concat(a,b,, cast(id#0L as string)) AS text#2] +- Range (0, 2, step=1, splits=Some(2)) ``` After: ``` +---+---+---+ | a| b| 0| +---+---+---+ | a| b| 1| +---+---+---+ ``` ### How was this patch tested? Unittest was added. Closes #27561 from HyukjinKwon/SPARK-30810. Authored-by: HyukjinKwon Signed-off-by: Wenchen Fan (cherry picked from commit 2a270a731a3b1da9a0fb036d648dd522e5c4d5ad) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala | 7 --- .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 21fabac..d8b52c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -33,11 +33,12 @@ object CSVUtils { // with the one below, `filterCommentAndEmpty` but execution path is different. One of them // might have to be removed in the near future if possible. import lines.sqlContext.implicits._ -val nonEmptyLines = lines.filter(length(trim($"value")) > 0) +val aliased = lines.toDF("value") +val nonEmptyLines = aliased.filter(length(trim($"value")) > 0) if (options.isCommentSet) { - nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)) + nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String] } else { - nonEmptyLines + nonEmptyLines.as[String] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index b1105b4..0be0e1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2294,6 +2294,13 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa } } } + + test("SPARK-30810: parses and convert a CSV Dataset having different column from 'value'") { +val ds = spark.range(2).selectExpr("concat('a,b,', id) AS `a.text`").as[String] +val csv = spark.read.option("header", true).option("inferSchema", true).csv(ds) +assert(csv.schema.fieldNames === Seq("a", "b", "0")) +checkAnswer(csv, Row("a", "b", 1)) + } } class CSVv1Suite extends CSVSuite { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (99b8136 -> 2a270a7)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 99b8136 [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly add 2a270a7 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala | 7 --- .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (99b8136 -> 2a270a7)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 99b8136 [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly add 2a270a7 [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala | 7 --- .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 7 +++ 2 files changed, 11 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0dcc4df [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly 0dcc4df is described below commit 0dcc4df0ca5dba8ae09388b95969080ca28cbe16 Author: yi.wu AuthorDate: Fri Feb 14 16:52:28 2020 +0800 [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly ### What changes were proposed in this pull request? We should convert Spark InternalRows to hive data via `HiveInspectors.wrapperFor`. ### Why are the changes needed? We may hit below exception without this change: ``` [info]org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, 192.168.1.6, executor driver): java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to org.apache.hadoop.hive.common.type.HiveDecimal [info] at org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector.getPrimitiveJavaObject(JavaHiveDecimalObjectInspector.java:55) [info] at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:321) [info] at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292) [info] at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247) [info] at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231) [info] at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:55) [info] at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$2(ScriptTransformationExec.scala:300) [info] at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$2$adapted(ScriptTransformationExec.scala:281) [info] at scala.collection.Iterator.foreach(Iterator.scala:941) [info] at scala.collection.Iterator.foreach$(Iterator.scala:941) [info] at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) [info] at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$1(ScriptTransformationExec.scala:281) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932) [info] at org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformationExec.scala:270) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added new test. But please note that this test returns different result between Hive1.2 and Hive2.3 due to `HiveDecimal` or `SerDe` difference(don't know the root cause yet). Closes #27556 from Ngone51/script_transform. Lead-authored-by: yi.wu Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit 99b8136a86030411e6bcbd312f40eb2a901ab0f0) Signed-off-by: Wenchen Fan --- .../hive/execution/ScriptTransformationExec.scala | 32 +-- sql/hive/src/test/resources/test_script.py | 21 ++ .../hive/execution/ScriptTransformationSuite.scala | 46 +- 3 files changed, 85 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index e12f663..40f7b4e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -94,9 +94,8 @@ case class ScriptTransformationExec( // This new thread will consume the ScriptTransformation's input rows and write them to the // external process. That process's output will be read by this current thread. val writerThread = new ScriptTransformationWriterThread( -inputIterator, +inputIterator.map(outputProjection), input.map(_.dataType), -outputProjection, inputSerde, inputSoi, ioschema, @@ -249,16 +248,15 @@ case class ScriptTransformationExec( private class ScriptTransformationWriterThread( iter: Iterator[InternalRow], inputSchema: Seq[DataType], -outputProjection: Projection, @Nullable inputSerde: AbstractSerDe, -@Nullable inputSoi: ObjectInspector, +@Nullable inputSoi: StructObjectInspector, ioschema:
[spark] branch master updated (b2134ee -> 99b8136)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b2134ee [SPARK-30823][PYTHON][DOCS] Set `%PYTHONPATH%` when building PySpark documentation on Windows add 99b8136 [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly No new revisions were added by this update. Summary of changes: .../hive/execution/ScriptTransformationExec.scala | 32 +-- .../hive/src/test/resources/test_script.py | 11 +++--- .../hive/execution/ScriptTransformationSuite.scala | 46 +- 3 files changed, 69 insertions(+), 20 deletions(-) copy dev/scalafmt => sql/hive/src/test/resources/test_script.py (84%) mode change 100755 => 100644 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b2134ee -> 99b8136)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b2134ee [SPARK-30823][PYTHON][DOCS] Set `%PYTHONPATH%` when building PySpark documentation on Windows add 99b8136 [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly No new revisions were added by this update. Summary of changes: .../hive/execution/ScriptTransformationExec.scala | 32 +-- .../hive/src/test/resources/test_script.py | 11 +++--- .../hive/execution/ScriptTransformationSuite.scala | 46 +- 3 files changed, 69 insertions(+), 20 deletions(-) copy dev/scalafmt => sql/hive/src/test/resources/test_script.py (84%) mode change 100755 => 100644 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org