[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-11-04 Thread xuanyuanking
Github user xuanyuanking closed the pull request at:

https://github.com/apache/spark/pull/20675


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r171161352
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
 ---
@@ -33,4 +33,16 @@
  * as a restart checkpoint.
  */
 PartitionOffset getOffset();
+
+/**
+ * Set the start offset for the current record, only used in task 
retry. If setOffset keep
+ * default implementation, it means current ContinuousDataReader can't 
support task level retry.
+ *
+ * @param offset last offset before task retry.
+ */
+default void setOffset(PartitionOffset offset) {
--- End diff --

Cool, that's more clearer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r171014505
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
 ---
@@ -33,4 +33,16 @@
  * as a restart checkpoint.
  */
 PartitionOffset getOffset();
+
+/**
+ * Set the start offset for the current record, only used in task 
retry. If setOffset keep
+ * default implementation, it means current ContinuousDataReader can't 
support task level retry.
+ *
+ * @param offset last offset before task retry.
+ */
+default void setOffset(PartitionOffset offset) {
--- End diff --

I think it might be better to create a new interface 
ContinuousDataReaderFactory, and implement this there as something like 
`createDataReaderWithOffset(PartitionOffset offset)`. That way the intended 
lifecycle is explicit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-27 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r171011587
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
 spark.sparkContext.addSparkListener(listener)
 try {
   testStream(df, useV2Sink = true)(
-StartStream(Trigger.Continuous(100)),
+StartStream(longContinuousTrigger),
+AwaitEpoch(0),
 Execute(waitForRateSourceTriggers(_, 2)),
+IncrementEpoch(),
 Execute { _ =>
   // Wait until a task is started, then kill its first attempt.
   eventually(timeout(streamingTimeout)) {
 assert(taskId != -1)
   }
   spark.sparkContext.killTaskAttempt(taskId)
 },
-ExpectFailure[SparkException] { e =>
-  e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
-})
+Execute(waitForRateSourceTriggers(_, 4)),
+IncrementEpoch(),
+// Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
+// will also return true.
+CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --

Ah, right, my bad.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r170830121
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
 spark.sparkContext.addSparkListener(listener)
 try {
   testStream(df, useV2Sink = true)(
-StartStream(Trigger.Continuous(100)),
+StartStream(longContinuousTrigger),
+AwaitEpoch(0),
 Execute(waitForRateSourceTriggers(_, 2)),
+IncrementEpoch(),
 Execute { _ =>
   // Wait until a task is started, then kill its first attempt.
   eventually(timeout(streamingTimeout)) {
 assert(taskId != -1)
   }
   spark.sparkContext.killTaskAttempt(taskId)
 },
-ExpectFailure[SparkException] { e =>
-  e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
-})
+Execute(waitForRateSourceTriggers(_, 4)),
+IncrementEpoch(),
+// Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
+// will also return true.
+CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --

Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the 
test case failure probably because the CP maybe not stop between Range(0, 20) 
every time. See the logs below:
```
== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Analyzed Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Optimized Logical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- Project [value#13L]
   +- StreamingDataSourceV2Relation [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45

== Physical Plan ==
WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
+- *(1) Project [value#13L]
   +- *(1) DataSourceV2Scan [timestamp#12, value#13L], 
org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
 
 
ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class 
at (StreamTest.scala:436)
org.scalatest.exceptions.TestFailedException: 

== Results ==
!== Correct Answer - 20 ==   == Spark Answer - 25 ==
!struct   struct
 [0] [0]
 [10][10]
 [11][11]
 [12][12]
 [13][13]
 [14][14]
 [15][15]
 [16][16]
 [17][17]
 [18][18]
 [19][19]
 [1] [1]
![2] [20]
![3] [21]
![4] [22]
![5] [23]
![6] [24]
![7] [2]
![8] [3]
![9] [4]
![5]
![6]
![7]
![8]
![9]


== Progress ==
   
StartStream(ContinuousTrigger(360),org.apache.spark.util.SystemClock@343e225a,Map(),null)
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
   AssertOnQuery(, )
=> CheckAnswer: 
[0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
   StopStream
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-26 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r170665920
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -194,6 +194,12 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
 private def operatorName = if (lastOnly) "CheckLastBatch" else 
"CheckAnswer"
   }
 
+  case class CheckAnswerRowsContainsOnlyOnce(expectedAnswer: Seq[Row], 
lastOnly: Boolean = false)
--- End diff --

no need to add this - redundant with CheckAnswer


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-26 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20675#discussion_r170665692
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
 spark.sparkContext.addSparkListener(listener)
 try {
   testStream(df, useV2Sink = true)(
-StartStream(Trigger.Continuous(100)),
+StartStream(longContinuousTrigger),
+AwaitEpoch(0),
 Execute(waitForRateSourceTriggers(_, 2)),
+IncrementEpoch(),
 Execute { _ =>
   // Wait until a task is started, then kill its first attempt.
   eventually(timeout(streamingTimeout)) {
 assert(taskId != -1)
   }
   spark.sparkContext.killTaskAttempt(taskId)
 },
-ExpectFailure[SparkException] { e =>
-  e.getCause != null && 
e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
-})
+Execute(waitForRateSourceTriggers(_, 4)),
+IncrementEpoch(),
+// Check the answer exactly, if there's duplicated result, 
CheckAnserRowsContains
+// will also return true.
+CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
--- End diff --

Checking exact answer can just be `CheckAnswer(0 to 20: _*)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

2018-02-25 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/20675

[SPARK-23033][SS][Follow Up] Task level retry for continuous processing

## What changes were proposed in this pull request?

Here we want to reimplement the task level retry for continuous processing, 
changes include:
1. Add a new `EpochCoordinatorMessage` named `GetLastEpochAndOffset`, it is 
used for getting last epoch and offset of particular partition while task 
restarted.
2. Add function setOffset for `ContinuousDataReader`, it supported 
BaseReader can restart from given offset.

## How was this patch tested?

Add new UT in `ContinuousSuite` and new `StreamAction` named 
`CheckAnswerRowsContainsOnlyOnce` for more accurate result checking.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-23033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20675


commit 21f574e2a3ad3c8e68b92776d2a141d7fcb90502
Author: Yuanjian Li 
Date:   2018-02-26T07:27:10Z

[SPARK-23033][SS][Follow Up] Task level retry for continuous processing




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org