Repository: spark Updated Branches: refs/heads/branch-2.0 3b6463a79 -> 1c2dff1ee
[SPARK-17643] Remove comparable requirement from Offset (backport for branch-2.0) ## What changes were proposed in this pull request? Backport https://github.com/apache/spark/commit/988c71457354b0a443471f501cef544a85b1a76a to branch-2.0 ## How was this patch tested? Jenkins Author: Michael Armbrust <mich...@databricks.com> Closes #15362 from zsxwing/SPARK-17643-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c2dff1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c2dff1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c2dff1e Branch: refs/heads/branch-2.0 Commit: 1c2dff1eeeb045f3f5c3c1423ba07371b03965d7 Parents: 3b6463a Author: Michael Armbrust <mich...@databricks.com> Authored: Wed Oct 5 16:48:43 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Oct 5 16:48:43 2016 -0700 ---------------------------------------------------------------------- .../execution/streaming/CompositeOffset.scala | 30 --------------- .../sql/execution/streaming/LongOffset.scala | 6 --- .../spark/sql/execution/streaming/Offset.scala | 19 ++-------- .../execution/streaming/StreamExecution.scala | 9 +++-- .../spark/sql/streaming/OffsetSuite.scala | 39 -------------------- 5 files changed, 9 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1c2dff1e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 729c846..ebc6ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -24,36 +24,6 @@ package org.apache.spark.sql.execution.streaming */ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { /** - * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. - */ - override def compareTo(other: Offset): Int = other match { - case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size => - val comparisons = offsets.zip(otherComposite.offsets).map { - case (Some(a), Some(b)) => a compareTo b - case (None, None) => 0 - case (None, _) => -1 - case (_, None) => 1 - } - val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet - nonZeroSigns.size match { - case 0 => 0 // if both empty or only 0s - case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s) - case _ => // there are both 1s and -1s - throw new IllegalArgumentException( - s"Invalid comparison between non-linear histories: $this <=> $other") - } - case _ => - throw new IllegalArgumentException(s"Cannot compare $this <=> $other") - } - - private def sign(num: Int): Int = num match { - case i if i < 0 => -1 - case i if i == 0 => 0 - case i if i > 0 => 1 - } - - /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of * sources. * http://git-wip-us.apache.org/repos/asf/spark/blob/1c2dff1e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index bb17640..c5e8827 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming */ case class LongOffset(offset: Long) extends Offset { - override def compareTo(other: Offset): Int = other match { - case l: LongOffset => offset.compareTo(l.offset) - case _ => - throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}") - } - def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) http://git-wip-us.apache.org/repos/asf/spark/blob/1c2dff1e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala index 2cc0128..1f52abf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala @@ -19,19 +19,8 @@ package org.apache.spark.sql.execution.streaming /** * An offset is a monotonically increasing metric used to track progress in the computation of a - * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent - * with `equals` and `hashcode`. + * stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global + * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no + * new data has arrived. */ -trait Offset extends Serializable { - - /** - * Returns a negative integer, zero, or a positive integer as this object is less than, equal to, - * or greater than the specified object. - */ - def compareTo(other: Offset): Int - - def >(other: Offset): Boolean = compareTo(other) > 0 - def <(other: Offset): Boolean = compareTo(other) < 0 - def <=(other: Offset): Boolean = compareTo(other) <= 0 - def >=(other: Offset): Boolean = compareTo(other) >= 0 -} +trait Offset extends Serializable {} http://git-wip-us.apache.org/repos/asf/spark/blob/1c2dff1e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b7587f2..d50dfd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -259,7 +259,7 @@ class StreamExecution( case (source, available) => committedOffsets .get(source) - .map(committed => committed < available) + .map(committed => committed != available) .getOrElse(true) } } @@ -318,7 +318,8 @@ class StreamExecution( // Request unprocessed data from all sources. val newData = availableOffsets.flatMap { - case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => + case (source, available) + if committedOffsets.get(source).map(_ != available).getOrElse(true) => val current = committedOffsets.get(source) val batch = source.getBatch(current, available) logDebug(s"Retrieving data from $source: $current -> $available") @@ -404,10 +405,10 @@ class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is indented for use primarily when writing tests. */ - def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset + !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset } while (notDone) { http://git-wip-us.apache.org/repos/asf/spark/blob/1c2dff1e/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala index 9590af4..b65a987 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala @@ -24,44 +24,12 @@ trait OffsetSuite extends SparkFunSuite { /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */ def compare(one: Offset, two: Offset): Unit = { test(s"comparison $one <=> $two") { - assert(one < two) - assert(one <= two) - assert(one <= one) - assert(two > one) - assert(two >= one) - assert(one >= one) assert(one == one) assert(two == two) assert(one != two) assert(two != one) } } - - /** Creates test to check that non-equality comparisons throw exception. */ - def compareInvalid(one: Offset, two: Offset): Unit = { - test(s"invalid comparison $one <=> $two") { - intercept[IllegalArgumentException] { - assert(one < two) - } - - intercept[IllegalArgumentException] { - assert(one <= two) - } - - intercept[IllegalArgumentException] { - assert(one > two) - } - - intercept[IllegalArgumentException] { - assert(one >= two) - } - - assert(!(one == two)) - assert(!(two == one)) - assert(one != two) - assert(two != one) - } - } } class LongOffsetSuite extends OffsetSuite { @@ -79,10 +47,6 @@ class CompositeOffsetSuite extends OffsetSuite { one = CompositeOffset(None :: Nil), two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - compareInvalid( // sizes must be same - one = CompositeOffset(Nil), - two = CompositeOffset(Some(LongOffset(2)) :: Nil)) - compare( one = CompositeOffset.fill(LongOffset(0), LongOffset(1)), two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) @@ -91,8 +55,5 @@ class CompositeOffsetSuite extends OffsetSuite { one = CompositeOffset.fill(LongOffset(1), LongOffset(1)), two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) - compareInvalid( - one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent - two = CompositeOffset.fill(LongOffset(1), LongOffset(2))) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org