[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user xuanyuanking closed the pull request at: https://github.com/apache/spark/pull/20675 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r171161352 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java --- @@ -33,4 +33,16 @@ * as a restart checkpoint. */ PartitionOffset getOffset(); + +/** + * Set the start offset for the current record, only used in task retry. If setOffset keep + * default implementation, it means current ContinuousDataReader can't support task level retry. + * + * @param offset last offset before task retry. + */ +default void setOffset(PartitionOffset offset) { --- End diff -- Cool, that's more clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r171014505 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java --- @@ -33,4 +33,16 @@ * as a restart checkpoint. */ PartitionOffset getOffset(); + +/** + * Set the start offset for the current record, only used in task retry. If setOffset keep + * default implementation, it means current ContinuousDataReader can't support task level retry. + * + * @param offset last offset before task retry. + */ +default void setOffset(PartitionOffset offset) { --- End diff -- I think it might be better to create a new interface ContinuousDataReaderFactory, and implement this there as something like `createDataReaderWithOffset(PartitionOffset offset)`. That way the intended lifecycle is explicit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r171011587 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase { spark.sparkContext.addSparkListener(listener) try { testStream(df, useV2Sink = true)( -StartStream(Trigger.Continuous(100)), +StartStream(longContinuousTrigger), +AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), +IncrementEpoch(), Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) }, -ExpectFailure[SparkException] { e => - e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] -}) +Execute(waitForRateSourceTriggers(_, 4)), +IncrementEpoch(), +// Check the answer exactly, if there's duplicated result, CheckAnserRowsContains +// will also return true. +CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))), --- End diff -- Ah, right, my bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r170830121 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase { spark.sparkContext.addSparkListener(listener) try { testStream(df, useV2Sink = true)( -StartStream(Trigger.Continuous(100)), +StartStream(longContinuousTrigger), +AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), +IncrementEpoch(), Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) }, -ExpectFailure[SparkException] { e => - e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] -}) +Execute(waitForRateSourceTriggers(_, 4)), +IncrementEpoch(), +// Check the answer exactly, if there's duplicated result, CheckAnserRowsContains +// will also return true. +CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))), --- End diff -- Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the test case failure probably because the CP maybe not stop between Range(0, 20) every time. See the logs below: ``` == Plan == == Parsed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d +- Project [value#13L] +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45 == Analyzed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d +- Project [value#13L] +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45 == Optimized Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d +- Project [value#13L] +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45 == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d +- *(1) Project [value#13L] +- *(1) DataSourceV2Scan [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45 ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class at (StreamTest.scala:436) org.scalatest.exceptions.TestFailedException: == Results == !== Correct Answer - 20 == == Spark Answer - 25 == !struct struct [0] [0] [10][10] [11][11] [12][12] [13][13] [14][14] [15][15] [16][16] [17][17] [18][18] [19][19] [1] [1] ![2] [20] ![3] [21] ![4] [22] ![5] [23] ![6] [24] ![7] [2] ![8] [3] ![9] [4] ![5] ![6] ![7] ![8] ![9] == Progress == StartStream(ContinuousTrigger(360),org.apache.spark.util.SystemClock@343e225a,Map(),null) AssertOnQuery(, ) AssertOnQuery(, ) AssertOnQuery(, ) AssertOnQuery(, ) AssertOnQuery(, ) AssertOnQuery(, ) => CheckAnswer: [0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19] StopStream ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r170665920 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -194,6 +194,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" } + case class CheckAnswerRowsContainsOnlyOnce(expectedAnswer: Seq[Row], lastOnly: Boolean = false) --- End diff -- no need to add this - redundant with CheckAnswer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20675#discussion_r170665692 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase { spark.sparkContext.addSparkListener(listener) try { testStream(df, useV2Sink = true)( -StartStream(Trigger.Continuous(100)), +StartStream(longContinuousTrigger), +AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), +IncrementEpoch(), Execute { _ => // Wait until a task is started, then kill its first attempt. eventually(timeout(streamingTimeout)) { assert(taskId != -1) } spark.sparkContext.killTaskAttempt(taskId) }, -ExpectFailure[SparkException] { e => - e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException] -}) +Execute(waitForRateSourceTriggers(_, 4)), +IncrementEpoch(), +// Check the answer exactly, if there's duplicated result, CheckAnserRowsContains +// will also return true. +CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))), --- End diff -- Checking exact answer can just be `CheckAnswer(0 to 20: _*)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/20675 [SPARK-23033][SS][Follow Up] Task level retry for continuous processing ## What changes were proposed in this pull request? Here we want to reimplement the task level retry for continuous processing, changes include: 1. Add a new `EpochCoordinatorMessage` named `GetLastEpochAndOffset`, it is used for getting last epoch and offset of particular partition while task restarted. 2. Add function setOffset for `ContinuousDataReader`, it supported BaseReader can restart from given offset. ## How was this patch tested? Add new UT in `ContinuousSuite` and new `StreamAction` named `CheckAnswerRowsContainsOnlyOnce` for more accurate result checking. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-23033 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20675 commit 21f574e2a3ad3c8e68b92776d2a141d7fcb90502 Author: Yuanjian LiDate: 2018-02-26T07:27:10Z [SPARK-23033][SS][Follow Up] Task level retry for continuous processing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org