Repository: spark Updated Branches: refs/heads/master f051f8340 -> 73da3b696
[SPARK-23293][SQL] fix data source v2 self join ## What changes were proposed in this pull request? `DataSourceV2Relation` should extend `MultiInstanceRelation`, to take care of self-join. ## How was this patch tested? a new test Author: Wenchen Fan <wenc...@databricks.com> Closes #20466 from cloud-fan/dsv2-selfjoin. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73da3b69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73da3b69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73da3b69 Branch: refs/heads/master Commit: 73da3b6968630d9e2cafc742ccb6d4eb54957df4 Parents: f051f83 Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Feb 1 10:48:34 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Feb 1 10:48:34 2018 -0800 ---------------------------------------------------------------------- .../sql/execution/datasources/v2/DataSourceV2Relation.scala | 8 +++++++- .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 6 ++++++ 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73da3b69/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 3d4c649..eebfa29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.sources.v2.reader._ case class DataSourceV2Relation( fullOutput: Seq[AttributeReference], - reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder { + reader: DataSourceReader) + extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] @@ -33,6 +35,10 @@ case class DataSourceV2Relation( case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } + + override def newInstance(): DataSourceV2Relation = { + copy(fullOutput = fullOutput.map(_.newInstance())) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/73da3b69/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 1c3ba78..23147ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -217,6 +217,12 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-23293: data source v2 self join") { + val df = spark.read.format(classOf[SimpleDataSourceV2].getName).load() + val df2 = df.select(($"i" + 1).as("k"), $"j") + checkAnswer(df.join(df2, "j"), (0 until 10).map(i => Row(-i, i, i + 1))) + } } class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org