Repository: spark
Updated Branches:
  refs/heads/master b9a6f7499 -> dc8a6befa


[SPARK-24588][SS] streaming join should require HashClusteredPartitioning from 
children

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/19080 we simplified the 
distribution/partitioning framework, and make all the join-like operators 
require `HashClusteredDistribution` from children. Unfortunately streaming join 
operator was missed.

This can cause wrong result. Think about
```
val input1 = MemoryStream[Int]
val input2 = MemoryStream[Int]

val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
val joined = df1.join(df2, Seq("a", "b")).select('a)
```

The physical plan is
```
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ 
leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(a#5, b#6, 5)
   :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
   :     +- StreamingRelation MemoryStream[value#1], [value#1]
   +- Exchange hashpartitioning(b#11, 5)
      +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
         +- StreamingRelation MemoryStream[value#3], [value#3]
```

The left table is hash partitioned by `a, b`, while the right table is hash 
partitioned by `b`. This means, we may have a matching record that is in 
different partitions, which should be in the output but not.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21587 from cloud-fan/join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc8a6bef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc8a6bef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc8a6bef

Branch: refs/heads/master
Commit: dc8a6befa5dad861a731b4d7865f3ccf37482ae0
Parents: b9a6f74
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Jun 21 15:38:46 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Jun 21 15:38:46 2018 -0700

----------------------------------------------------------------------
 .../catalyst/plans/physical/partitioning.scala  |  53 +++--
 .../spark/sql/catalyst/DistributionSuite.scala  | 201 +++++++++++++++----
 .../datasources/v2/DataSourcePartitioning.scala |   4 +-
 .../StreamingSymmetricHashJoinExec.scala        |   4 +-
 .../sql/streaming/StreamingJoinSuite.scala      |  14 ++
 5 files changed, 217 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc8a6bef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 4d9a992..cc1a5e8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -99,16 +99,19 @@ case class ClusteredDistribution(
  * This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given 
a tuple and the
  * number of partitions, this distribution strictly requires which partition 
the tuple should be in.
  */
-case class HashClusteredDistribution(expressions: Seq[Expression]) extends 
Distribution {
+case class HashClusteredDistribution(
+    expressions: Seq[Expression],
+    requiredNumPartitions: Option[Int] = None) extends Distribution {
   require(
     expressions != Nil,
-    "The expressions for hash of a HashPartitionedDistribution should not be 
Nil. " +
+    "The expressions for hash of a HashClusteredDistribution should not be 
Nil. " +
       "An AllTuples should be used to represent a distribution that only has " 
+
       "a single partition.")
 
-  override def requiredNumPartitions: Option[Int] = None
-
   override def createPartitioning(numPartitions: Int): Partitioning = {
+    assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
+      s"This HashClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
+        s"the actual number of partitions is $numPartitions.")
     HashPartitioning(expressions, numPartitions)
   }
 }
@@ -163,11 +166,22 @@ trait Partitioning {
    * i.e. the current dataset does not need to be re-partitioned for the 
`required`
    * Distribution (it is possible that tuples within a partition need to be 
reorganized).
    *
+   * A [[Partitioning]] can never satisfy a [[Distribution]] if its 
`numPartitions` does't match
+   * [[Distribution.requiredNumPartitions]].
+   */
+  final def satisfies(required: Distribution): Boolean = {
+    required.requiredNumPartitions.forall(_ == numPartitions) && 
satisfies0(required)
+  }
+
+  /**
+   * The actual method that defines whether this [[Partitioning]] can satisfy 
the given
+   * [[Distribution]], after the `numPartitions` check.
+   *
    * By default a [[Partitioning]] can satisfy [[UnspecifiedDistribution]], 
and [[AllTuples]] if
-   * the [[Partitioning]] only have one partition. Implementations can 
overwrite this method with
-   * special logic.
+   * the [[Partitioning]] only have one partition. Implementations can also 
overwrite this method
+   * with special logic.
    */
-  def satisfies(required: Distribution): Boolean = required match {
+  protected def satisfies0(required: Distribution): Boolean = required match {
     case UnspecifiedDistribution => true
     case AllTuples => numPartitions == 1
     case _ => false
@@ -186,9 +200,8 @@ case class RoundRobinPartitioning(numPartitions: Int) 
extends Partitioning
 case object SinglePartition extends Partitioning {
   val numPartitions = 1
 
-  override def satisfies(required: Distribution): Boolean = required match {
+  override def satisfies0(required: Distribution): Boolean = required match {
     case _: BroadcastDistribution => false
-    case ClusteredDistribution(_, Some(requiredNumPartitions)) => 
requiredNumPartitions == 1
     case _ => true
   }
 }
@@ -205,16 +218,15 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
   override def nullable: Boolean = false
   override def dataType: DataType = IntegerType
 
-  override def satisfies(required: Distribution): Boolean = {
-    super.satisfies(required) || {
+  override def satisfies0(required: Distribution): Boolean = {
+    super.satisfies0(required) || {
       required match {
         case h: HashClusteredDistribution =>
           expressions.length == h.expressions.length && 
expressions.zip(h.expressions).forall {
             case (l, r) => l.semanticEquals(r)
           }
-        case ClusteredDistribution(requiredClustering, requiredNumPartitions) 
=>
-          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x))) &&
-            (requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions)
+        case ClusteredDistribution(requiredClustering, _) =>
+          expressions.forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
         case _ => false
       }
     }
@@ -246,15 +258,14 @@ case class RangePartitioning(ordering: Seq[SortOrder], 
numPartitions: Int)
   override def nullable: Boolean = false
   override def dataType: DataType = IntegerType
 
-  override def satisfies(required: Distribution): Boolean = {
-    super.satisfies(required) || {
+  override def satisfies0(required: Distribution): Boolean = {
+    super.satisfies0(required) || {
       required match {
         case OrderedDistribution(requiredOrdering) =>
           val minSize = Seq(requiredOrdering.size, ordering.size).min
           requiredOrdering.take(minSize) == ordering.take(minSize)
-        case ClusteredDistribution(requiredClustering, requiredNumPartitions) 
=>
-          ordering.map(_.child).forall(x => 
requiredClustering.exists(_.semanticEquals(x))) &&
-            (requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions)
+        case ClusteredDistribution(requiredClustering, _) =>
+          ordering.map(_.child).forall(x => 
requiredClustering.exists(_.semanticEquals(x)))
         case _ => false
       }
     }
@@ -295,7 +306,7 @@ case class PartitioningCollection(partitionings: 
Seq[Partitioning])
    * Returns true if any `partitioning` of this collection satisfies the given
    * [[Distribution]].
    */
-  override def satisfies(required: Distribution): Boolean =
+  override def satisfies0(required: Distribution): Boolean =
     partitionings.exists(_.satisfies(required))
 
   override def toString: String = {
@@ -310,7 +321,7 @@ case class PartitioningCollection(partitionings: 
Seq[Partitioning])
 case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
   override val numPartitions: Int = 1
 
-  override def satisfies(required: Distribution): Boolean = required match {
+  override def satisfies0(required: Distribution): Boolean = required match {
     case BroadcastDistribution(m) if m == mode => true
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dc8a6bef/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index b47b8ad..3922810 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -41,34 +41,127 @@ class DistributionSuite extends SparkFunSuite {
     }
   }
 
-  test("HashPartitioning (with nullSafe = true) is the output partitioning") {
-    // Cases which do not need an exchange between two data properties.
+  test("UnspecifiedDistribution and AllTuples") {
+    // except `BroadcastPartitioning`, all other partitioning can satisfy 
UnspecifiedDistribution
     checkSatisfied(
-      HashPartitioning(Seq('a, 'b, 'c), 10),
+      UnknownPartitioning(-1),
       UnspecifiedDistribution,
       true)
 
     checkSatisfied(
-      HashPartitioning(Seq('a, 'b, 'c), 10),
-      ClusteredDistribution(Seq('a, 'b, 'c)),
+      RoundRobinPartitioning(10),
+      UnspecifiedDistribution,
       true)
 
     checkSatisfied(
-      HashPartitioning(Seq('b, 'c), 10),
-      ClusteredDistribution(Seq('a, 'b, 'c)),
+      SinglePartition,
+      UnspecifiedDistribution,
+      true)
+
+    checkSatisfied(
+      HashPartitioning(Seq('a), 10),
+      UnspecifiedDistribution,
+      true)
+
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc), 10),
+      UnspecifiedDistribution,
+      true)
+
+    checkSatisfied(
+      BroadcastPartitioning(IdentityBroadcastMode),
+      UnspecifiedDistribution,
+      false)
+
+    // except `BroadcastPartitioning`, all other partitioning can satisfy 
AllTuples if they have
+    // only one partition.
+    checkSatisfied(
+      UnknownPartitioning(1),
+      AllTuples,
+      true)
+
+    checkSatisfied(
+      UnknownPartitioning(10),
+      AllTuples,
+      false)
+
+    checkSatisfied(
+      RoundRobinPartitioning(1),
+      AllTuples,
+      true)
+
+    checkSatisfied(
+      RoundRobinPartitioning(10),
+      AllTuples,
+      false)
+
+    checkSatisfied(
+      SinglePartition,
+      AllTuples,
+      true)
+
+    checkSatisfied(
+      HashPartitioning(Seq('a), 1),
+      AllTuples,
       true)
 
     checkSatisfied(
+      HashPartitioning(Seq('a), 10),
+      AllTuples,
+      false)
+
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc), 1),
+      AllTuples,
+      true)
+
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc), 10),
+      AllTuples,
+      false)
+
+    checkSatisfied(
+      BroadcastPartitioning(IdentityBroadcastMode),
+      AllTuples,
+      false)
+  }
+
+  test("SinglePartition is the output partitioning") {
+    // SinglePartition can satisfy all the distributions except 
`BroadcastDistribution`
+    checkSatisfied(
       SinglePartition,
       ClusteredDistribution(Seq('a, 'b, 'c)),
       true)
 
     checkSatisfied(
       SinglePartition,
+      HashClusteredDistribution(Seq('a, 'b, 'c)),
+      true)
+
+    checkSatisfied(
+      SinglePartition,
       OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)),
       true)
 
-    // Cases which need an exchange between two data properties.
+    checkSatisfied(
+      SinglePartition,
+      BroadcastDistribution(IdentityBroadcastMode),
+      false)
+  }
+
+  test("HashPartitioning is the output partitioning") {
+    // HashPartitioning can satisfy ClusteredDistribution iff its hash 
expressions are a subset of
+    // the required clustering expressions.
+    checkSatisfied(
+      HashPartitioning(Seq('a, 'b, 'c), 10),
+      ClusteredDistribution(Seq('a, 'b, 'c)),
+      true)
+
+    checkSatisfied(
+      HashPartitioning(Seq('b, 'c), 10),
+      ClusteredDistribution(Seq('a, 'b, 'c)),
+      true)
+
     checkSatisfied(
       HashPartitioning(Seq('a, 'b, 'c), 10),
       ClusteredDistribution(Seq('b, 'c)),
@@ -79,37 +172,43 @@ class DistributionSuite extends SparkFunSuite {
       ClusteredDistribution(Seq('d, 'e)),
       false)
 
+    // HashPartitioning can satisfy HashClusteredDistribution iff its hash 
expressions are exactly
+    // same with the required hash clustering expressions.
     checkSatisfied(
       HashPartitioning(Seq('a, 'b, 'c), 10),
-      AllTuples,
+      HashClusteredDistribution(Seq('a, 'b, 'c)),
+      true)
+
+    checkSatisfied(
+      HashPartitioning(Seq('c, 'b, 'a), 10),
+      HashClusteredDistribution(Seq('a, 'b, 'c)),
       false)
 
     checkSatisfied(
+      HashPartitioning(Seq('a, 'b), 10),
+      HashClusteredDistribution(Seq('a, 'b, 'c)),
+      false)
+
+    // HashPartitioning cannot satisfy OrderedDistribution
+    checkSatisfied(
       HashPartitioning(Seq('a, 'b, 'c), 10),
       OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)),
       false)
 
     checkSatisfied(
-      HashPartitioning(Seq('b, 'c), 10),
+      HashPartitioning(Seq('a, 'b, 'c), 1),
       OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)),
-      false)
+      false) // TODO: this can be relaxed.
 
-    // TODO: We should check functional dependencies
-    /*
     checkSatisfied(
-      ClusteredDistribution(Seq('b)),
-      ClusteredDistribution(Seq('b + 1)),
-      true)
-    */
+      HashPartitioning(Seq('b, 'c), 10),
+      OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)),
+      false)
   }
 
   test("RangePartitioning is the output partitioning") {
-    // Cases which do not need an exchange between two data properties.
-    checkSatisfied(
-      RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      UnspecifiedDistribution,
-      true)
-
+    // RangePartitioning can satisfy OrderedDistribution iff its ordering is a 
prefix
+    // of the required ordering, or the required ordering is a prefix of its 
ordering.
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
       OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)),
@@ -125,6 +224,27 @@ class DistributionSuite extends SparkFunSuite {
       OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc, 'd.desc)),
       true)
 
+    // TODO: We can have an optimization to first sort the dataset
+    // by a.asc and then sort b, and c in a partition. This optimization
+    // should tradeoff the benefit of a less number of Exchange operators
+    // and the parallelism.
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
+      OrderedDistribution(Seq('a.asc, 'b.desc, 'c.asc)),
+      false)
+
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
+      OrderedDistribution(Seq('b.asc, 'a.asc)),
+      false)
+
+    checkSatisfied(
+      RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
+      OrderedDistribution(Seq('a.asc, 'b.asc, 'd.desc)),
+      false)
+
+    // RangePartitioning can satisfy ClusteredDistribution iff its ordering 
expressions are a subset
+    // of the required clustering expressions.
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
       ClusteredDistribution(Seq('a, 'b, 'c)),
@@ -140,34 +260,47 @@ class DistributionSuite extends SparkFunSuite {
       ClusteredDistribution(Seq('b, 'c, 'a, 'd)),
       true)
 
-    // Cases which need an exchange between two data properties.
-    // TODO: We can have an optimization to first sort the dataset
-    // by a.asc and then sort b, and c in a partition. This optimization
-    // should tradeoff the benefit of a less number of Exchange operators
-    // and the parallelism.
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      OrderedDistribution(Seq('a.asc, 'b.desc, 'c.asc)),
+      ClusteredDistribution(Seq('a, 'b)),
       false)
 
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      OrderedDistribution(Seq('b.asc, 'a.asc)),
+      ClusteredDistribution(Seq('c, 'd)),
       false)
 
+    // RangePartitioning cannot satisfy HashClusteredDistribution
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      ClusteredDistribution(Seq('a, 'b)),
+      HashClusteredDistribution(Seq('a, 'b, 'c)),
       false)
+  }
 
+  test("Partitioning.numPartitions must match 
Distribution.requiredNumPartitions to satisfy it") {
     checkSatisfied(
-      RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      ClusteredDistribution(Seq('c, 'd)),
+      SinglePartition,
+      ClusteredDistribution(Seq('a, 'b, 'c), Some(10)),
+      false)
+
+    checkSatisfied(
+      SinglePartition,
+      HashClusteredDistribution(Seq('a, 'b, 'c), Some(10)),
+      false)
+
+    checkSatisfied(
+      HashPartitioning(Seq('a, 'b, 'c), 10),
+      ClusteredDistribution(Seq('a, 'b, 'c), Some(5)),
+      false)
+
+    checkSatisfied(
+      HashPartitioning(Seq('a, 'b, 'c), 10),
+      HashClusteredDistribution(Seq('a, 'b, 'c), Some(5)),
       false)
 
     checkSatisfied(
       RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10),
-      AllTuples,
+      ClusteredDistribution(Seq('a, 'b, 'c), Some(5)),
       false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dc8a6bef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
index 017a673..33079d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
@@ -30,8 +30,8 @@ class DataSourcePartitioning(
 
   override val numPartitions: Int = partitioning.numPartitions()
 
-  override def satisfies(required: physical.Distribution): Boolean = {
-    super.satisfies(required) || {
+  override def satisfies0(required: physical.Distribution): Boolean = {
+    super.satisfies0(required) || {
       required match {
         case d: physical.ClusteredDistribution if isCandidate(d.clustering) =>
           val attrs = d.clustering.map(_.asInstanceOf[Attribute])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc8a6bef/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index afa664e..50cf971 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -167,8 +167,8 @@ case class StreamingSymmetricHashJoinExec(
   val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
 
   override def requiredChildDistribution: Seq[Distribution] =
-    ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
-      ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil
+    HashClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) ::
+      HashClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: 
Nil
 
   override def output: Seq[Attribute] = joinType match {
     case _: InnerLike => left.output ++ right.output

http://git-wip-us.apache.org/repos/asf/spark/blob/dc8a6bef/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 1f62357..c5cc8df 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -404,6 +404,20 @@ class StreamingInnerJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       AddData(input3, 5, 10),
       CheckNewAnswer((5, 10, 5, 15, 5, 25)))
   }
+
+  test("streaming join should require HashClusteredDistribution from 
children") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
+    val df2 = input2.toDF.select('value as 'a, 'value * 2 as 
'b).repartition('b)
+    val joined = df1.join(df2, Seq("a", "b")).select('a)
+
+    testStream(joined)(
+      AddData(input1, 1.to(1000): _*),
+      AddData(input2, 1.to(1000): _*),
+      CheckAnswer(1.to(1000): _*))
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to