Repository: spark
Updated Branches:
  refs/heads/master 1cc66a072 -> 05ae74778


[SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests

## What changes were proposed in this pull request?

Unit tests for EpochCoordinator that test correct sequencing of committed 
epochs. Several tests are ignored since they test functionality implemented in 
SPARK-23503 which is not yet merged, otherwise they fail.

Author: Efim Poberezkin <e...@poberezkin.ru>

Closes #20983 from efimpoberezkin/pr/EpochCoordinator-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05ae7477
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05ae7477
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05ae7477

Branch: refs/heads/master
Commit: 05ae74778a10fbdd7f2cbf7742de7855966b7d35
Parents: 1cc66a0
Author: Efim Poberezkin <e...@poberezkin.ru>
Authored: Tue Apr 17 04:13:17 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Apr 17 04:13:17 2018 -0700

----------------------------------------------------------------------
 .../continuous/EpochCoordinatorSuite.scala      | 224 +++++++++++++++++++
 1 file changed, 224 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05ae7477/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
new file mode 100644
index 0000000..99e3056
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import org.mockito.InOrder
+import org.mockito.Matchers.{any, eq => eqTo}
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.LocalSparkSession
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.test.TestSparkSession
+
+class EpochCoordinatorSuite
+  extends SparkFunSuite
+    with LocalSparkSession
+    with MockitoSugar
+    with BeforeAndAfterEach {
+
+  private var epochCoordinator: RpcEndpointRef = _
+
+  private var writer: StreamWriter = _
+  private var query: ContinuousExecution = _
+  private var orderVerifier: InOrder = _
+
+  override def beforeEach(): Unit = {
+    val reader = mock[ContinuousReader]
+    writer = mock[StreamWriter]
+    query = mock[ContinuousExecution]
+    orderVerifier = inOrder(writer, query)
+
+    spark = new TestSparkSession()
+
+    epochCoordinator
+      = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, 
SparkEnv.get)
+  }
+
+  test("single epoch") {
+    setWriterPartitions(3)
+    setReaderPartitions(2)
+
+    commitPartitionEpoch(0, 1)
+    commitPartitionEpoch(1, 1)
+    commitPartitionEpoch(2, 1)
+    reportPartitionOffset(0, 1)
+    reportPartitionOffset(1, 1)
+
+    // Here and in subsequent tests this is called to make a synchronous call 
to EpochCoordinator
+    // so that mocks would have been acted upon by the time verification 
happens
+    makeSynchronousCall()
+
+    verifyCommit(1)
+  }
+
+  test("single epoch, all but one writer partition has committed") {
+    setWriterPartitions(3)
+    setReaderPartitions(2)
+
+    commitPartitionEpoch(0, 1)
+    commitPartitionEpoch(1, 1)
+    reportPartitionOffset(0, 1)
+    reportPartitionOffset(1, 1)
+
+    makeSynchronousCall()
+
+    verifyNoCommitFor(1)
+  }
+
+  test("single epoch, all but one reader partition has reported an offset") {
+    setWriterPartitions(3)
+    setReaderPartitions(2)
+
+    commitPartitionEpoch(0, 1)
+    commitPartitionEpoch(1, 1)
+    commitPartitionEpoch(2, 1)
+    reportPartitionOffset(0, 1)
+
+    makeSynchronousCall()
+
+    verifyNoCommitFor(1)
+  }
+
+  test("consequent epochs, messages for epoch (k + 1) arrive after messages 
for epoch k") {
+    setWriterPartitions(2)
+    setReaderPartitions(2)
+
+    commitPartitionEpoch(0, 1)
+    commitPartitionEpoch(1, 1)
+    reportPartitionOffset(0, 1)
+    reportPartitionOffset(1, 1)
+
+    commitPartitionEpoch(0, 2)
+    commitPartitionEpoch(1, 2)
+    reportPartitionOffset(0, 2)
+    reportPartitionOffset(1, 2)
+
+    makeSynchronousCall()
+
+    verifyCommitsInOrderOf(List(1, 2))
+  }
+
+  ignore("consequent epochs, a message for epoch k arrives after messages for 
epoch (k + 1)") {
+    setWriterPartitions(2)
+    setReaderPartitions(2)
+
+    commitPartitionEpoch(0, 1)
+    commitPartitionEpoch(1, 1)
+    reportPartitionOffset(0, 1)
+
+    commitPartitionEpoch(0, 2)
+    commitPartitionEpoch(1, 2)
+    reportPartitionOffset(0, 2)
+    reportPartitionOffset(1, 2)
+
+    // Message that arrives late
+    reportPartitionOffset(1, 1)
+
+    makeSynchronousCall()
+
+    verifyCommitsInOrderOf(List(1, 2))
+  }
+
+  ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
+    setWriterPartitions(1)
+    setReaderPartitions(1)
+
+    commitPartitionEpoch(0, 1)
+    reportPartitionOffset(0, 1)
+
+    commitPartitionEpoch(0, 3)
+    reportPartitionOffset(0, 3)
+
+    commitPartitionEpoch(0, 4)
+    reportPartitionOffset(0, 4)
+
+    commitPartitionEpoch(0, 2)
+    reportPartitionOffset(0, 2)
+
+    makeSynchronousCall()
+
+    verifyCommitsInOrderOf(List(1, 2, 3, 4))
+  }
+
+  ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
+    setWriterPartitions(1)
+    setReaderPartitions(1)
+
+    commitPartitionEpoch(0, 1)
+    reportPartitionOffset(0, 1)
+
+    commitPartitionEpoch(0, 3)
+    reportPartitionOffset(0, 3)
+
+    commitPartitionEpoch(0, 5)
+    reportPartitionOffset(0, 5)
+
+    commitPartitionEpoch(0, 4)
+    reportPartitionOffset(0, 4)
+
+    commitPartitionEpoch(0, 2)
+    reportPartitionOffset(0, 2)
+
+    makeSynchronousCall()
+
+    verifyCommitsInOrderOf(List(1, 2, 3, 4, 5))
+  }
+
+  private def setWriterPartitions(numPartitions: Int): Unit = {
+    epochCoordinator.askSync[Unit](SetWriterPartitions(numPartitions))
+  }
+
+  private def setReaderPartitions(numPartitions: Int): Unit = {
+    epochCoordinator.askSync[Unit](SetReaderPartitions(numPartitions))
+  }
+
+  private def commitPartitionEpoch(partitionId: Int, epoch: Long): Unit = {
+    val dummyMessage: WriterCommitMessage = mock[WriterCommitMessage]
+    epochCoordinator.send(CommitPartitionEpoch(partitionId, epoch, 
dummyMessage))
+  }
+
+  private def reportPartitionOffset(partitionId: Int, epoch: Long): Unit = {
+    val dummyOffset: PartitionOffset = mock[PartitionOffset]
+    epochCoordinator.send(ReportPartitionOffset(partitionId, epoch, 
dummyOffset))
+  }
+
+  private def makeSynchronousCall(): Unit = {
+    epochCoordinator.askSync[Long](GetCurrentEpoch)
+  }
+
+  private def verifyCommit(epoch: Long): Unit = {
+    orderVerifier.verify(writer).commit(eqTo(epoch), any())
+    orderVerifier.verify(query).commit(epoch)
+  }
+
+  private def verifyNoCommitFor(epoch: Long): Unit = {
+    verify(writer, never()).commit(eqTo(epoch), any())
+    verify(query, never()).commit(epoch)
+  }
+
+  private def verifyCommitsInOrderOf(epochs: Seq[Long]): Unit = {
+    epochs.foreach(verifyCommit)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to