[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/12725


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-16 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-219529687
  
LGTM. Merging to master / 2.0. Thanks, @lw-lin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-219517153
  
**[Test build #2986 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2986/consoleFull)**
 for PR 12725 at commit 
[`a72423b`](https://github.com/apache/spark/commit/a72423b5aab05189c56707897fc638f4c49a3c06).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-219494067
  
**[Test build #2986 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2986/consoleFull)**
 for PR 12725 at commit 
[`a72423b`](https://github.com/apache/spark/commit/a72423b5aab05189c56707897fc638f4c49a3c06).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-15 Thread lw-lin
Github user lw-lin commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-219336043
  
@marmbrus @zsxwing maybe this is ready to go? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/12725#discussion_r63246142
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with 
SharedSQLContext {
 }
   }
 
-  // This would fail for now -- error is "Timed out waiting for stream"
-  // Root cause is that data generated in batch 0 may not get processed in 
batch 1
-  // Let's enable this after SPARK-14942: Reduce delay between batch 
construction and execution
-  ignore("minimize delay between batch construction and execution") {
+  test("minimize delay between batch construction and execution") {
+
+// For each batch, we would retrieve new data's offsets and log them 
before we run the execution
+// This checks whether the key of the offset log is the expected batch 
id
+def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
+  AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
+s"offsetLog's latest should be $expectedId")
+
+// For each batch, we would log the state change during the execution
+// This checks whether the key of the state change log is the expected 
batch id
+def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): 
AssertOnQuery =
+  
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId 
== expectedId,
+s"lastExecution's currentBatchId should be $expectedId")
+
+// For each batch, we would log the sink change after the execution
+// This checks whether the key of the sink change log is the expected 
batch id
+def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
+  AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == 
expectedId,
+s"sink's lastBatchId should be $expectedId")
+
 val inputData = MemoryStream[Int]
 testStream(inputData.toDS())(
   StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
   /* -- batch 0 --- */
-  AddData(inputData, 1),
-  AddData(inputData, 2),
-  AddData(inputData, 3),
+  // Add some data in batch 0
+  AddData(inputData, 1, 2, 3),
   AdvanceManualClock(10 * 1000), // 10 seconds
+
   /* -- batch 1 --- */
-  CheckAnswer(1, 2, 3))
+  // Check the results of batch 0
+  CheckAnswer(1, 2, 3),
+  CheckIncrementalExecutionCurrentBatchId(0),
+  CheckOffsetLogLatestBatchId(0),
+  CheckSinkLatestBatchId(0),
+  // Add some data in batch 1
+  AddData(inputData, 4, 5, 6),
+  AdvanceManualClock(10 * 1000),
+
+  /* -- batch _ --- */
+  // Check the results of batch 1
+  CheckAnswer(1, 2, 3, 4, 5, 6),
+  CheckIncrementalExecutionCurrentBatchId(1),
+  CheckOffsetLogLatestBatchId(1),
+  CheckSinkLatestBatchId(1),
+
+  AdvanceManualClock(10 * 1000),
+  AdvanceManualClock(10 * 1000),
+  AdvanceManualClock(10 * 1000),
+
+  /* -- batch __ -- */
+  // Check the results of batch 1 again; this is to make sure that, 
when there's no new data,
+  // the currentId does not get logged (e.g. as 2) even if the clock 
has advanced many times
+  CheckAnswer(1, 2, 3, 4, 5, 6),
+  CheckIncrementalExecutionCurrentBatchId(1),
+  CheckOffsetLogLatestBatchId(1),
+  CheckSinkLatestBatchId(1),
+
+  /* Stop then restart the Stream  */
+  StopStream,
+  StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
+  /* -- batch 1 rerun - */
--- End diff --

Failure is the rare case, so I don't think its that bad to rerun if it 
reduces the complexity of the implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/12725#discussion_r62758276
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with 
SharedSQLContext {
 }
   }
 
-  // This would fail for now -- error is "Timed out waiting for stream"
-  // Root cause is that data generated in batch 0 may not get processed in 
batch 1
-  // Let's enable this after SPARK-14942: Reduce delay between batch 
construction and execution
-  ignore("minimize delay between batch construction and execution") {
+  test("minimize delay between batch construction and execution") {
+
+// For each batch, we would retrieve new data's offsets and log them 
before we run the execution
+// This checks whether the key of the offset log is the expected batch 
id
+def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
+  AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
+s"offsetLog's latest should be $expectedId")
+
+// For each batch, we would log the state change during the execution
+// This checks whether the key of the state change log is the expected 
batch id
+def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): 
AssertOnQuery =
+  
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId 
== expectedId,
+s"lastExecution's currentBatchId should be $expectedId")
+
+// For each batch, we would log the sink change after the execution
+// This checks whether the key of the sink change log is the expected 
batch id
+def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
+  AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == 
expectedId,
+s"sink's lastBatchId should be $expectedId")
+
 val inputData = MemoryStream[Int]
 testStream(inputData.toDS())(
   StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
   /* -- batch 0 --- */
-  AddData(inputData, 1),
-  AddData(inputData, 2),
-  AddData(inputData, 3),
+  // Add some data in batch 0
+  AddData(inputData, 1, 2, 3),
   AdvanceManualClock(10 * 1000), // 10 seconds
+
   /* -- batch 1 --- */
-  CheckAnswer(1, 2, 3))
+  // Check the results of batch 0
+  CheckAnswer(1, 2, 3),
+  CheckIncrementalExecutionCurrentBatchId(0),
+  CheckOffsetLogLatestBatchId(0),
+  CheckSinkLatestBatchId(0),
+  // Add some data in batch 1
+  AddData(inputData, 4, 5, 6),
+  AdvanceManualClock(10 * 1000),
+
+  /* -- batch _ --- */
+  // Check the results of batch 1
+  CheckAnswer(1, 2, 3, 4, 5, 6),
+  CheckIncrementalExecutionCurrentBatchId(1),
+  CheckOffsetLogLatestBatchId(1),
+  CheckSinkLatestBatchId(1),
+
+  AdvanceManualClock(10 * 1000),
+  AdvanceManualClock(10 * 1000),
+  AdvanceManualClock(10 * 1000),
+
+  /* -- batch __ -- */
+  // Check the results of batch 1 again; this is to make sure that, 
when there's no new data,
+  // the currentId does not get logged (e.g. as 2) even if the clock 
has advanced many times
+  CheckAnswer(1, 2, 3, 4, 5, 6),
+  CheckIncrementalExecutionCurrentBatchId(1),
+  CheckOffsetLogLatestBatchId(1),
+  CheckSinkLatestBatchId(1),
+
+  /* Stop then restart the Stream  */
+  StopStream,
+  StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
+  /* -- batch 1 rerun - */
--- End diff --

I'm wondering if we can avoid to rerun a batch that has already finished 
before stopping. How about storing the offsets after finishing a batch instead 
of storing it before running a batch? @marmbrus what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-09 Thread lw-lin
Github user lw-lin commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-218016900
  
@zsxwing would you take another look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread lw-lin
Github user lw-lin commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217608390
  
I've addressed comments and expanded tests; @zsxwing would you mind taking 
another look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217606678
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58054/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217606677
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217606648
  
**[Test build #58054 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58054/consoleFull)**
 for PR 12725 at commit 
[`a72423b`](https://github.com/apache/spark/commit/a72423b5aab05189c56707897fc638f4c49a3c06).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217601995
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/12725#discussion_r62410545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, 
SparkPlan, SparkPlanner,
  * A variant of [[QueryExecution]] that allows the execution of the given 
[[LogicalPlan]]
  * plan incrementally. Possibly preserving state in between each execution.
  */
-class IncrementalExecution(
+class IncrementalExecution private[sql](
 sparkSession: SparkSession,
 logicalPlan: LogicalPlan,
 outputMode: OutputMode,
 checkpointLocation: String,
-currentBatchId: Long)
+val currentBatchId: Long)
--- End diff --

expose this to tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217601994
  
**[Test build #58051 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58051/consoleFull)**
 for PR 12725 at commit 
[`d4cd47a`](https://github.com/apache/spark/commit/d4cd47a07bfb395deee0461d0b43be0424110379).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217601996
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58051/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/12725#discussion_r62410548
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -122,7 +122,7 @@ class StreamExecution(
* processing is done.  Thus, the Nth record in this log indicated data 
that is currently being
* processed and the N-1th entry indicates which offsets have been 
durably committed to the sink.
*/
-  private val offsetLog =
+  private[sql] val offsetLog =
--- End diff --

expose this to test suits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217602755
  
**[Test build #58054 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58054/consoleFull)**
 for PR 12725 at commit 
[`a72423b`](https://github.com/apache/spark/commit/a72423b5aab05189c56707897fc638f4c49a3c06).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-217601944
  
**[Test build #58051 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58051/consoleFull)**
 for PR 12725 at commit 
[`d4cd47a`](https://github.com/apache/spark/commit/d4cd47a07bfb395deee0461d0b43be0424110379).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-02 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-216302421
  
Looks pretty good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-05-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/12725#discussion_r61770411
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -160,12 +160,22 @@ class StreamExecution(
 
   // While active, repeatedly attempt to run batches.
   SQLContext.setActive(sparkSession.wrapped)
-  populateStartOffsets()
-  logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+
   triggerExecutor.execute(() => {
 if (isActive) {
-  if (dataAvailable) runBatch()
-  constructNextBatch()
+  if (currentBatchId < 0) {
+// We'll do this initialization only once
+populateStartOffsets()
+logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+  }
+  else {
--- End diff --

nit: merge this line and the previous one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-04-29 Thread lw-lin
Github user lw-lin commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-215927916
  
For things to be easy to review, I've added the manual timed executor for 
testing general cases in [a separate 
PR](https://github.com/apache/spark/pull/12797). When that patch get merged, 
I'll add some dedicated tests here for this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14942][SQL][Streaming] Reduce delay bet...

2016-04-28 Thread lw-lin
Github user lw-lin commented on the pull request:

https://github.com/apache/spark/pull/12725#issuecomment-215405018
  
Sure, I'll add a manual timed executor and some dedicated tests as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org