[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted), 

[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted), 

[spark] branch master updated (d273a2b -> d0f9614)

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d273a2b  [SPARK-20628][CORE][K8S] Start to improve Spark 
decommissioning & preemption support
 add d0f9614  [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted), 

[spark] branch master updated (d273a2b -> d0f9614)

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d273a2b  [SPARK-20628][CORE][K8S] Start to improve Spark 
decommissioning & preemption support
 add d0f9614  [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & preemption support

2020-02-14 Thread holden
This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d273a2b  [SPARK-20628][CORE][K8S] Start to improve Spark 
decommissioning & preemption support
d273a2b is described below

commit d273a2bb0fac452a97f5670edd69d3e452e3e57e
Author: Holden Karau 
AuthorDate: Fri Feb 14 12:36:52 2020 -0800

[SPARK-20628][CORE][K8S] Start to improve Spark decommissioning & 
preemption support

This PR is based on an existing/previou PR - 
https://github.com/apache/spark/pull/19045

### What changes were proposed in this pull request?

This changes adds a decommissioning state that we can enter when the cloud 
provider/scheduler lets us know we aren't going to be removed immediately but 
instead will be removed soon. This concept fits nicely in K8s and also with 
spot-instances on AWS / preemptible instances all of which we can get a notice 
that our host is going away. For now we simply stop scheduling jobs, in the 
future we could perform some kind of migration of data during scale-down, or at 
least stop accepting new  [...]

There is a design document at 
https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?usp=sharing

### Why are the changes needed?

With more move to preemptible multi-tenancy, serverless environments, and 
spot-instances better handling of node scale down is required.

### Does this PR introduce any user-facing change?

There is no API change, however an additional configuration flag is added 
to enable/disable this behaviour.

### How was this patch tested?

New integration tests in the Spark K8s integration testing. Extension of 
the AppClientSuite to test decommissioning seperate from the K8s.

Closes #26440 from 
holdenk/SPARK-20628-keep-track-of-nodes-which-are-going-to-be-shutdown-r4.

Lead-authored-by: Holden Karau 
Co-authored-by: Holden Karau 
Signed-off-by: Holden Karau 
---
 .../org/apache/spark/deploy/DeployMessage.scala|  11 ++
 .../org/apache/spark/deploy/ExecutorState.scala|   8 +-
 .../spark/deploy/client/StandaloneAppClient.scala  |   2 +
 .../client/StandaloneAppClientListener.scala   |   2 +
 .../org/apache/spark/deploy/master/Master.scala|  31 ++
 .../org/apache/spark/deploy/worker/Worker.scala|  26 +
 .../executor/CoarseGrainedExecutorBackend.scala|  39 ++-
 .../scala/org/apache/spark/executor/Executor.scala |  16 +++
 .../org/apache/spark/internal/config/Worker.scala  |   5 +
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |   2 +
 .../spark/scheduler/ExecutorLossReason.scala   |   8 ++
 .../scala/org/apache/spark/scheduler/Pool.scala|   4 +
 .../org/apache/spark/scheduler/Schedulable.scala   |   1 +
 .../apache/spark/scheduler/SchedulerBackend.scala  |   3 +
 .../org/apache/spark/scheduler/TaskScheduler.scala |   5 +
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |   5 +
 .../apache/spark/scheduler/TaskSetManager.scala|   6 ++
 .../cluster/CoarseGrainedClusterMessage.scala  |   2 +
 .../cluster/CoarseGrainedSchedulerBackend.scala|  66 +++-
 .../cluster/StandaloneSchedulerBackend.scala   |   6 ++
 .../scala/org/apache/spark/util/SignalUtils.scala  |   2 +-
 .../spark/deploy/client/AppClientSuite.scala   |  39 ++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |   2 +
 .../scheduler/ExternalClusterManagerSuite.scala|   1 +
 .../spark/scheduler/WorkerDecommissionSuite.scala  |  84 +++
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |   3 +
 .../k8s/features/BasicExecutorFeatureStep.scala|  20 +++-
 .../docker/src/main/dockerfiles/spark/Dockerfile   |   4 +-
 .../docker/src/main/dockerfiles/spark/decom.sh |  35 ++
 .../src/main/dockerfiles/spark/entrypoint.sh   |   6 +-
 .../dev/dev-run-integration-tests.sh   |   9 +-
 .../k8s/integrationtest/DecommissionSuite.scala|  49 +
 .../k8s/integrationtest/KubernetesSuite.scala  | 117 -
 .../integration-tests/tests/decommissioning.py |  45 
 sbin/decommission-slave.sh |  57 ++
 sbin/spark-daemon.sh   |  15 +++
 36 files changed, 690 insertions(+), 46 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index fba371d..18305ad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
 assert (port > 0)
   }
 
+  /**
+   * @param id the worker id
+   * @param worker the worker endpoint ref
+   */
+  case class 

[spark] branch branch-3.0 updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver

2020-02-14 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1385fc0  [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy 
environment variable to set in both executor and driver
1385fc0 is described below

commit 1385fc02ce7d28e6570971e1687e74d245a5533f
Author: HyukjinKwon 
AuthorDate: Fri Feb 14 10:18:08 2020 -0800

[SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment 
variable to set in both executor and driver

### What changes were proposed in this pull request?

This PR address the comment at 
https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves 
the migration guide to explicitly note that the legacy environment variable to 
set in both executor and driver.

### Why are the changes needed?

To clarify this env should be set both in driver and executors.

### Does this PR introduce any user-facing change?

Nope.

### How was this patch tested?

I checked it via md editor.

Closes #27573 from HyukjinKwon/SPARK-29748.

Authored-by: HyukjinKwon 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit b343757b1bd5d0344b82f36aa4d65ed34f840606)
Signed-off-by: Shixiong Zhu 
---
 docs/pyspark-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md
index 8ea4fec..f7f2038 100644
--- a/docs/pyspark-migration-guide.md
+++ b/docs/pyspark-migration-guide.md
@@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.
   - Since Spark 3.0, `Column.getItem` is fixed such that it does not call 
`Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, 
the indexing operator should be used.
 For example, `map_col.getItem(col('id'))` should be replaced with 
`map_col[col('id')]`.
 
-  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 
3.6, the field names will be sorted alphabetically as the only option.
+  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - 
this environment variable must be consistent on all executors and driver; 
otherwise, it may cause failures or incorrect answers. For [...]
 
 ## Upgrading from PySpark 2.3 to 2.4
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment variable to set in both executor and driver

2020-02-14 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b343757  [SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy 
environment variable to set in both executor and driver
b343757 is described below

commit b343757b1bd5d0344b82f36aa4d65ed34f840606
Author: HyukjinKwon 
AuthorDate: Fri Feb 14 10:18:08 2020 -0800

[SPARK-29748][DOCS][FOLLOW-UP] Add a note that the legacy environment 
variable to set in both executor and driver

### What changes were proposed in this pull request?

This PR address the comment at 
https://github.com/apache/spark/pull/26496#discussion_r379194091 and improves 
the migration guide to explicitly note that the legacy environment variable to 
set in both executor and driver.

### Why are the changes needed?

To clarify this env should be set both in driver and executors.

### Does this PR introduce any user-facing change?

Nope.

### How was this patch tested?

I checked it via md editor.

Closes #27573 from HyukjinKwon/SPARK-29748.

Authored-by: HyukjinKwon 
Signed-off-by: Shixiong Zhu 
---
 docs/pyspark-migration-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md
index 8ea4fec..f7f2038 100644
--- a/docs/pyspark-migration-guide.md
+++ b/docs/pyspark-migration-guide.md
@@ -87,7 +87,7 @@ Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.
   - Since Spark 3.0, `Column.getItem` is fixed such that it does not call 
`Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, 
the indexing operator should be used.
 For example, `map_col.getItem(col('id'))` should be replaced with 
`map_col[col('id')]`.
 
-  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 
3.6, the field names will be sorted alphabetically as the only option.
+  - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when 
constructing with named arguments for Python versions 3.6 and above, and the 
order of fields will match that as entered. To enable sorted fields by default, 
as in Spark 2.4, set the environment variable 
`PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true" for both executors and driver - 
this environment variable must be consistent on all executors and driver; 
otherwise, it may cause failures or incorrect answers. For [...]
 
 ## Upgrading from PySpark 2.3 to 2.4
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` levels

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 0a8d7a3  [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` 
and `DAY` levels
0a8d7a3 is described below

commit 0a8d7a35e24acbd7af57fe5169691afb8ccd8675
Author: Maxim Gekk 
AuthorDate: Fri Feb 14 22:16:57 2020 +0800

[SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` and `DAY` 
levels

### What changes were proposed in this pull request?
In the PR, I propose to use Java 8 time API in timestamp truncations to the 
levels of `HOUR` and `DAY`. The problem is in the usage of 
`timeZone.getOffset(millis)` in days/hours truncations where the combined 
calendar (Julian + Gregorian) is used underneath.

### Why are the changes needed?
The change fix wrong truncations. For example, the following truncation to 
hours should print `0010-01-01 01:00:00` but it outputs wrong timestamp:
```scala
Seq("0010-01-01 01:02:03.123456").toDF()
.select($"value".cast("timestamp").as("ts"))
.select(date_trunc("HOUR", $"ts").cast("string"))
.show(false)
++
|CAST(date_trunc(HOUR, ts) AS STRING)|
++
|0010-01-01 01:30:17 |
++
```

### Does this PR introduce any user-facing change?
Yes. After the changes, the result of the example above is:
```scala
++
|CAST(date_trunc(HOUR, ts) AS STRING)|
++
|0010-01-01 01:00:00 |
++
```

### How was this patch tested?
- Added new test to `DateFunctionsSuite`
- By `DateExpressionsSuite` and `DateTimeUtilsSuite`

Closes #27512 from MaxGekk/fix-trunc-old-timestamp.

Authored-by: Maxim Gekk 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 7137a6d065edeaab97bf5bf49ffaca3d060a14fe)
Signed-off-by: Wenchen Fan 
---
 .../catalyst/expressions/datetimeExpressions.scala |  6 +--
 .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++---
 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +--
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 13 +++
 4 files changed, 59 insertions(+), 43 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index cf91489..adf7251 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -1690,15 +1690,15 @@ case class TruncTimestamp(
 
   override def eval(input: InternalRow): Any = {
 evalHelper(input, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC) { (t: Any, 
level: Int) =>
-  DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, timeZone)
+  DateTimeUtils.truncTimestamp(t.asInstanceOf[Long], level, zoneId)
 }
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-val tz = ctx.addReferenceObj("timeZone", timeZone)
+val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
 codeGenHelper(ctx, ev, minLevel = MIN_LEVEL_OF_TIMESTAMP_TRUNC, true) {
   (date: String, fmt: String) =>
-s"truncTimestamp($date, $fmt, $tz);"
+s"truncTimestamp($date, $fmt, $zid);"
 }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 01d36f1..ce0c138 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -711,32 +711,34 @@ object DateTimeUtils {
 }
   }
 
+  private def truncToUnit(t: SQLTimestamp, zoneId: ZoneId, unit: ChronoUnit): 
SQLTimestamp = {
+val truncated = microsToInstant(t).atZone(zoneId).truncatedTo(unit)
+instantToMicros(truncated.toInstant)
+  }
+
   /**
* Returns the trunc date time from original date time and trunc level.
* Trunc level should be generated using `parseTruncLevel()`, should be 
between 0 and 12.
*/
-  def truncTimestamp(t: SQLTimestamp, level: Int, timeZone: TimeZone): 
SQLTimestamp = {
-if (level == TRUNC_TO_MICROSECOND) return t
-var millis = MICROSECONDS.toMillis(t)
-val truncated = level match {
-  case TRUNC_TO_MILLISECOND => millis
-  case TRUNC_TO_SECOND =>
-millis - millis % 

[spark] branch master updated (2a270a7 -> 7137a6d)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 2a270a7  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API
 add 7137a6d  [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` 
and `DAY` levels

No new revisions were added by this update.

Summary of changes:
 .../catalyst/expressions/datetimeExpressions.scala |  6 +--
 .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++---
 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +--
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 13 +++
 4 files changed, 59 insertions(+), 43 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (2a270a7 -> 7137a6d)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 2a270a7  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API
 add 7137a6d  [SPARK-30766][SQL] Fix the timestamp truncation to the `HOUR` 
and `DAY` levels

No new revisions were added by this update.

Summary of changes:
 .../catalyst/expressions/datetimeExpressions.scala |  6 +--
 .../spark/sql/catalyst/util/DateTimeUtils.scala| 44 +++---
 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 39 +--
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 13 +++
 4 files changed, 59 insertions(+), 43 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column from 'value' in csv(dataset) API

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 79ce792  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API
79ce792 is described below

commit 79ce79234f02092e22fdd79e859d83f5a174ef95
Author: HyukjinKwon 
AuthorDate: Fri Feb 14 18:20:18 2020 +0800

[SPARK-30810][SQL] Parses and convert a CSV Dataset having different column 
from 'value' in csv(dataset) API

### What changes were proposed in this pull request?

This PR fixes `DataFrameReader.csv(dataset: Dataset[String])` API to take a 
`Dataset[String]` originated from a column name different from `value`. This is 
a long-standing bug started from the very first place.

`CSVUtils.filterCommentAndEmpty` assumed the `Dataset[String]` to be 
originated with `value` column. This PR changes to use the first column name in 
the schema.

### Why are the changes needed?

For  `DataFrameReader.csv(dataset: Dataset[String])` to support any 
`Dataset[String]` as the signature indicates.

### Does this PR introduce any user-facing change?
Yes,

```scala
val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String]
spark.read.option("header", true).option("inferSchema", true).csv(ds).show()
```

Before:

```
org.apache.spark.sql.AnalysisException: cannot resolve '`value`' given 
input columns: [text];;
'Filter (length(trim('value, None)) > 0)
+- Project [concat(a,b,, cast(id#0L as string)) AS text#2]
   +- Range (0, 2, step=1, splits=Some(2))
```

After:

```
+---+---+---+
|  a|  b|  0|
+---+---+---+
|  a|  b|  1|
+---+---+---+
```

### How was this patch tested?

Unittest was added.

Closes #27561 from HyukjinKwon/SPARK-30810.

Authored-by: HyukjinKwon 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 2a270a731a3b1da9a0fb036d648dd522e5c4d5ad)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala  | 7 ---
 .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala  | 7 +++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 21fabac..d8b52c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -33,11 +33,12 @@ object CSVUtils {
 // with the one below, `filterCommentAndEmpty` but execution path is 
different. One of them
 // might have to be removed in the near future if possible.
 import lines.sqlContext.implicits._
-val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
+val aliased = lines.toDF("value")
+val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
 if (options.isCommentSet) {
-  nonEmptyLines.filter(!$"value".startsWith(options.comment.toString))
+  
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
 } else {
-  nonEmptyLines
+  nonEmptyLines.as[String]
 }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b1105b4..0be0e1e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2294,6 +2294,13 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
   }
 }
   }
+
+  test("SPARK-30810: parses and convert a CSV Dataset having different column 
from 'value'") {
+val ds = spark.range(2).selectExpr("concat('a,b,', id) AS 
`a.text`").as[String]
+val csv = spark.read.option("header", true).option("inferSchema", 
true).csv(ds)
+assert(csv.schema.fieldNames === Seq("a", "b", "0"))
+checkAnswer(csv, Row("a", "b", 1))
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (99b8136 -> 2a270a7)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 99b8136  [SPARK-25990][SQL] ScriptTransformation should handle 
different data types correctly
 add 2a270a7  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala  | 7 ---
 .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala  | 7 +++
 2 files changed, 11 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (99b8136 -> 2a270a7)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 99b8136  [SPARK-25990][SQL] ScriptTransformation should handle 
different data types correctly
 add 2a270a7  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala  | 7 ---
 .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala  | 7 +++
 2 files changed, 11 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-25990][SQL] ScriptTransformation should handle different data types correctly

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 0dcc4df  [SPARK-25990][SQL] ScriptTransformation should handle 
different data types correctly
0dcc4df is described below

commit 0dcc4df0ca5dba8ae09388b95969080ca28cbe16
Author: yi.wu 
AuthorDate: Fri Feb 14 16:52:28 2020 +0800

[SPARK-25990][SQL] ScriptTransformation should handle different data types 
correctly

### What changes were proposed in this pull request?

We should convert Spark InternalRows to hive data via 
`HiveInspectors.wrapperFor`.

### Why are the changes needed?

We may hit below exception without this change:

```
[info]org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 
in stage 1.0 (TID 1, 192.168.1.6, executor driver): 
java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast 
to org.apache.hadoop.hive.common.type.HiveDecimal
[info]  at 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector.getPrimitiveJavaObject(JavaHiveDecimalObjectInspector.java:55)
[info]  at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:321)
[info]  at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:292)
[info]  at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:247)
[info]  at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doSerialize(LazySimpleSerDe.java:231)
[info]  at 
org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.serialize(AbstractEncodingAwareSerDe.java:55)
[info]  at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$2(ScriptTransformationExec.scala:300)
[info]  at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$2$adapted(ScriptTransformationExec.scala:281)
[info]  at scala.collection.Iterator.foreach(Iterator.scala:941)
[info]  at scala.collection.Iterator.foreach$(Iterator.scala:941)
[info]  at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
[info]  at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.$anonfun$run$1(ScriptTransformationExec.scala:281)
[info]  at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]  at 
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
[info]  at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformationExec.scala:270)
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added new test. But please note that this test returns different result 
between Hive1.2 and Hive2.3 due to `HiveDecimal` or `SerDe` difference(don't 
know the root cause yet).

Closes #27556 from Ngone51/script_transform.

Lead-authored-by: yi.wu 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 99b8136a86030411e6bcbd312f40eb2a901ab0f0)
Signed-off-by: Wenchen Fan 
---
 .../hive/execution/ScriptTransformationExec.scala  | 32 +--
 sql/hive/src/test/resources/test_script.py | 21 ++
 .../hive/execution/ScriptTransformationSuite.scala | 46 +-
 3 files changed, 85 insertions(+), 14 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index e12f663..40f7b4e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -94,9 +94,8 @@ case class ScriptTransformationExec(
   // This new thread will consume the ScriptTransformation's input rows 
and write them to the
   // external process. That process's output will be read by this current 
thread.
   val writerThread = new ScriptTransformationWriterThread(
-inputIterator,
+inputIterator.map(outputProjection),
 input.map(_.dataType),
-outputProjection,
 inputSerde,
 inputSoi,
 ioschema,
@@ -249,16 +248,15 @@ case class ScriptTransformationExec(
 private class ScriptTransformationWriterThread(
 iter: Iterator[InternalRow],
 inputSchema: Seq[DataType],
-outputProjection: Projection,
 @Nullable inputSerde: AbstractSerDe,
-@Nullable inputSoi: ObjectInspector,
+@Nullable inputSoi: StructObjectInspector,
 ioschema: 

[spark] branch master updated (b2134ee -> 99b8136)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b2134ee  [SPARK-30823][PYTHON][DOCS] Set `%PYTHONPATH%` when building 
PySpark documentation on Windows
 add 99b8136  [SPARK-25990][SQL] ScriptTransformation should handle 
different data types correctly

No new revisions were added by this update.

Summary of changes:
 .../hive/execution/ScriptTransformationExec.scala  | 32 +--
 .../hive/src/test/resources/test_script.py | 11 +++---
 .../hive/execution/ScriptTransformationSuite.scala | 46 +-
 3 files changed, 69 insertions(+), 20 deletions(-)
 copy dev/scalafmt => sql/hive/src/test/resources/test_script.py (84%)
 mode change 100755 => 100644


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b2134ee -> 99b8136)

2020-02-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b2134ee  [SPARK-30823][PYTHON][DOCS] Set `%PYTHONPATH%` when building 
PySpark documentation on Windows
 add 99b8136  [SPARK-25990][SQL] ScriptTransformation should handle 
different data types correctly

No new revisions were added by this update.

Summary of changes:
 .../hive/execution/ScriptTransformationExec.scala  | 32 +--
 .../hive/src/test/resources/test_script.py | 11 +++---
 .../hive/execution/ScriptTransformationSuite.scala | 46 +-
 3 files changed, 69 insertions(+), 20 deletions(-)
 copy dev/scalafmt => sql/hive/src/test/resources/test_script.py (84%)
 mode change 100755 => 100644


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org