[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted), 

[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted), 

[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

2020-02-14 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for 
`InMemoryTable`
2824fec9 is described below

commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d
Author: DB Tsai 
AuthorDate: Fri Feb 14 21:46:01 2020 +

[SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`

### What changes were proposed in this pull request?
1. `InMemoryTable` was flatting the nested columns, and then the flatten 
columns was used to look up the indices which is not correct.

This PR implements partitioned by nested column for `InMemoryTable`.

### Why are the changes needed?

This PR implements partitioned by nested column for `InMemoryTable`, so we 
can test this features in DSv2

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing unit tests and new tests.

Closes #26929 from dbtsai/addTests.

Authored-by: DB Tsai 
Signed-off-by: DB Tsai 
(cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92)
Signed-off-by: DB Tsai 
---
 .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  | 86 +-
 2 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index c9e4e0a..0187ae3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -26,7 +26,7 @@ import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
Transform}
+import org.apache.spark.sql.connector.expressions.{IdentityTransform, 
NamedReference, Transform}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
@@ -59,10 +59,30 @@ class InMemoryTable(
 
   def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq
 
-  private val partFieldNames = 
partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames)
-  private val partIndexes = partFieldNames.map(schema.fieldIndex)
+  private val partCols: Array[Array[String]] = 
partitioning.flatMap(_.references).map { ref =>
+schema.findNestedField(ref.fieldNames(), includeCollections = false) match 
{
+  case Some(_) => ref.fieldNames()
+  case None => throw new IllegalArgumentException(s"${ref.describe()} does 
not exist.")
+}
+  }
 
-  private def getKey(row: InternalRow): Seq[Any] = 
partIndexes.map(row.toSeq(schema)(_))
+  private def getKey(row: InternalRow): Seq[Any] = {
+def extractor(fieldNames: Array[String], schema: StructType, row: 
InternalRow): Any = {
+  val index = schema.fieldIndex(fieldNames(0))
+  val value = row.toSeq(schema).apply(index)
+  if (fieldNames.length > 1) {
+(value, schema(index).dataType) match {
+  case (row: InternalRow, nestedSchema: StructType) =>
+extractor(fieldNames.drop(1), nestedSchema, row)
+  case (_, dataType) =>
+throw new IllegalArgumentException(s"Unsupported type, 
${dataType.simpleString}")
+}
+  } else {
+value
+  }
+}
+partCols.map(fieldNames => extractor(fieldNames, schema, row))
+  }
 
   def withData(data: Array[BufferedRows]): InMemoryTable = 
dataMap.synchronized {
 data.foreach(_.rows.foreach { row =>
@@ -146,8 +166,10 @@ class InMemoryTable(
   }
 
   private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 override def commit(messages: Array[WriterCommitMessage]): Unit = 
dataMap.synchronized {
-  val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, 
partFieldNames, filters)
+  val deleteKeys = InMemoryTable.filtersToKeys(
+dataMap.keys, partCols.map(_.toSeq.quoted), filters)
   dataMap --= deleteKeys
   withData(messages.map(_.asInstanceOf[BufferedRows]))
 }
@@ -161,7 +183,8 @@ class InMemoryTable(
   }
 
   override def deleteWhere(filters: Array[Filter]): Unit = 
dataMap.synchronized {
-dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, 
filters)
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, 
partCols.map(_.toSeq.quoted),