[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),
[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),
[spark] branch branch-3.0 updated: [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable`
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2824fec9 [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` 2824fec9 is described below commit 2824fec9fa57444b7c64edb8226cf75bb87a2e5d Author: DB Tsai AuthorDate: Fri Feb 14 21:46:01 2020 + [SPARK-30289][SQL] Partitioned by Nested Column for `InMemoryTable` ### What changes were proposed in this pull request? 1. `InMemoryTable` was flatting the nested columns, and then the flatten columns was used to look up the indices which is not correct. This PR implements partitioned by nested column for `InMemoryTable`. ### Why are the changes needed? This PR implements partitioned by nested column for `InMemoryTable`, so we can test this features in DSv2 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing unit tests and new tests. Closes #26929 from dbtsai/addTests. Authored-by: DB Tsai Signed-off-by: DB Tsai (cherry picked from commit d0f961476031b62bda0d4d41f7248295d651ea92) Signed-off-by: DB Tsai --- .../apache/spark/sql/connector/InMemoryTable.scala | 35 +++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 86 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index c9e4e0a..0187ae3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -26,7 +26,7 @@ import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.{IdentityTransform, Transform} +import org.apache.spark.sql.connector.expressions.{IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} @@ -59,10 +59,30 @@ class InMemoryTable( def rows: Seq[InternalRow] = dataMap.values.flatMap(_.rows).toSeq - private val partFieldNames = partitioning.flatMap(_.references).toSeq.flatMap(_.fieldNames) - private val partIndexes = partFieldNames.map(schema.fieldIndex) + private val partCols: Array[Array[String]] = partitioning.flatMap(_.references).map { ref => +schema.findNestedField(ref.fieldNames(), includeCollections = false) match { + case Some(_) => ref.fieldNames() + case None => throw new IllegalArgumentException(s"${ref.describe()} does not exist.") +} + } - private def getKey(row: InternalRow): Seq[Any] = partIndexes.map(row.toSeq(schema)(_)) + private def getKey(row: InternalRow): Seq[Any] = { +def extractor(fieldNames: Array[String], schema: StructType, row: InternalRow): Any = { + val index = schema.fieldIndex(fieldNames(0)) + val value = row.toSeq(schema).apply(index) + if (fieldNames.length > 1) { +(value, schema(index).dataType) match { + case (row: InternalRow, nestedSchema: StructType) => +extractor(fieldNames.drop(1), nestedSchema, row) + case (_, dataType) => +throw new IllegalArgumentException(s"Unsupported type, ${dataType.simpleString}") +} + } else { +value + } +} +partCols.map(fieldNames => extractor(fieldNames, schema, row)) + } def withData(data: Array[BufferedRows]): InMemoryTable = dataMap.synchronized { data.foreach(_.rows.foreach { row => @@ -146,8 +166,10 @@ class InMemoryTable( } private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) + val deleteKeys = InMemoryTable.filtersToKeys( +dataMap.keys, partCols.map(_.toSeq.quoted), filters) dataMap --= deleteKeys withData(messages.map(_.asInstanceOf[BufferedRows])) } @@ -161,7 +183,8 @@ class InMemoryTable( } override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { -dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partFieldNames, filters) +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper +dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted),