[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20445


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r166736890
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   }
 
   private def generateDebugString(
-  blocks: Iterable[Array[UnsafeRow]],
+  blocks: Seq[UnsafeRow],
--- End diff --

right! i thought of changing but forgot. my bad.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r166725859
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext)
   }
 
   private def generateDebugString(
-  blocks: Iterable[Array[UnsafeRow]],
+  blocks: Seq[UnsafeRow],
--- End diff --

nit: it's probably more "rows" than "blocks" now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-07 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r166720161
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 ---
@@ -46,49 +46,34 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 .foreach(new TestForeachWriter())
 .start()
 
-  // -- batch 0 ---
-  input.addData(1, 2, 3, 4)
-  query.processAllAvailable()
+  def verifyOutput(expectedVersion: Int, expectedData: Seq[Int]): Unit 
= {
+import ForeachSinkSuite._
 
-  var expectedEventsForPartition0 = Seq(
-ForeachSinkSuite.Open(partition = 0, version = 0),
-ForeachSinkSuite.Process(value = 2),
-ForeachSinkSuite.Process(value = 3),
-ForeachSinkSuite.Close(None)
-  )
-  var expectedEventsForPartition1 = Seq(
-ForeachSinkSuite.Open(partition = 1, version = 0),
-ForeachSinkSuite.Process(value = 1),
-ForeachSinkSuite.Process(value = 4),
-ForeachSinkSuite.Close(None)
-  )
+val events = ForeachSinkSuite.allEvents()
--- End diff --

This test assumed that the output would arrive in specific order after 
repartitioning, which isnt guaranteed. So I rewrote the test to verify the 
output in an order-independent way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r165543767
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
 
   def addData(data: TraversableOnce[A]): Offset = {
 val encoded = data.toVector.map(d => encoder.toRow(d).copy())
-val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming 
= true)
+val plan = new LocalRelation(attributes, encoded, isStreaming = false)
 val ds = Dataset[A](sqlContext.sparkSession, plan)
 logDebug(s"Adding ds: $ds")
--- End diff --

Good point. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r165222931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -270,16 +270,17 @@ class MicroBatchExecution(
 }
   case s: MicroBatchReader =>
 updateStatusMessage(s"Getting offsets from $s")
-reportTimeTaken("getOffset") {
-// Once v1 streaming source execution is gone, we can refactor 
this away.
-// For now, we set the range here to get the source to infer 
the available end offset,
-// get that offset, and then set the range again when we later 
execute.
-s.setOffsetRange(
-  toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
-  Optional.empty())
-
-  (s, Some(s.getEndOffset))
+reportTimeTaken("setOffsetRange") {
--- End diff --

I agree that the old metric names don't make much sense anymore, but I 
worry about changing external-facing behavior as part of an API migration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-02-01 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r165522181
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
 
   def addData(data: TraversableOnce[A]): Offset = {
 val encoded = data.toVector.map(d => encoder.toRow(d).copy())
-val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming 
= true)
+val plan = new LocalRelation(attributes, encoded, isStreaming = false)
 val ds = Dataset[A](sqlContext.sparkSession, plan)
 logDebug(s"Adding ds: $ds")
--- End diff --

Do we still need to store the batches as datasets, now that we're just 
collect()ing them back out in createDataReaderFactories()?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20445#discussion_r164933558
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala
 ---
@@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._
 trait DataSourceReaderHolder {
 
   /**
-   * The full output of the data source reader, without column pruning.
+   * The output of the data source reader, without column pruning.
*/
-  def fullOutput: Seq[AttributeReference]
--- End diff --

@cloud-fan This fixes the bug I spoke to you offline about. 
The target of this PR is only master, not 2.3. So if you want to have this 
fix in 2.3.0, please make a separate PR accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...

2018-01-30 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20445

[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs

## What changes were proposed in this pull request?

This PR migrates the MemoryStream to DataSourceV2 APIs. It fixes a few 
things along the way. 
1. Fixed bug in DataSourceV2ScanExec that prevents it from being 
canonicalized, required for some tests to pass (StreamingDeduplicateSuite)
2. Changed the reported keys in StreamingQueryProgress.durationMs. 
  - "getOffset" and "getBatch" replaced with "setOffsetRange" and 
"getEndOffset" as tracking that makese more sense. Unit tests changed 
accordingly.

## How was this patch tested?
Existing unit tests, few updated unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23092

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20445.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20445


commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f
Author: Burak Yavuz 
Date:   2018-01-11T16:44:19Z

save for so far

commit 78c50f860aa13f569669f4ad77f4325d80085c8b
Author: Burak Yavuz 
Date:   2018-01-12T18:27:49Z

Save so far

commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c
Author: Burak Yavuz 
Date:   2018-01-13T01:43:03Z

save so far

commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0
Author: Burak Yavuz 
Date:   2018-01-16T19:14:08Z

Compiles and I think also runs correctly

commit fd61724c6afcab5831fe8c602ad134d0c473184b
Author: Burak Yavuz 
Date:   2018-01-16T19:25:39Z

save

commit 7a0b564bd0c74525ebcea55b31f9658b1c2f0e12
Author: Burak Yavuz 
Date:   2018-01-16T19:28:31Z

fix merge conflicts

commit a81c2ecdafd54a2c5bfb07c6f1f53546eaa96c7c
Author: Burak Yavuz 
Date:   2018-01-16T22:26:28Z

fix hive

commit 1a4f4108118d976857778916b18499b4e0bf140c
Author: Tathagata Das 
Date:   2018-01-27T01:11:01Z

Undo changes to HiveSessionStateBuilder.scala

commit 083e93c26fd2d1e8c4c738b251a27724115a0001
Author: Tathagata Das 
Date:   2018-01-27T01:11:06Z

Merge remote-tracking branch 'apache-github/master' into HEAD

commit a817c8d40e4ecaf5e4e0c46f43313c5cceeec54e
Author: Tathagata Das 
Date:   2018-01-29T22:27:22Z

Fixed the setOffsetRange bug

commit 35b8854ae466e0313ff926cc1efb8c423d3eefea
Author: Tathagata Das 
Date:   2018-01-30T20:42:56Z

Fixed DataSourceV2ScanExec canonicalization bug

commit e66d809fe501b19b923a88d1b4cb9df69b4ae329
Author: Tathagata Das 
Date:   2018-01-31T00:57:59Z

Fixed metrics reported by MicroBatchExecution




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org