[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20445 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r166736890 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } private def generateDebugString( - blocks: Iterable[Array[UnsafeRow]], + blocks: Seq[UnsafeRow], --- End diff -- right! i thought of changing but forgot. my bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r166725859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } private def generateDebugString( - blocks: Iterable[Array[UnsafeRow]], + blocks: Seq[UnsafeRow], --- End diff -- nit: it's probably more "rows" than "blocks" now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r166720161 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala --- @@ -46,49 +46,34 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf .foreach(new TestForeachWriter()) .start() - // -- batch 0 --- - input.addData(1, 2, 3, 4) - query.processAllAvailable() + def verifyOutput(expectedVersion: Int, expectedData: Seq[Int]): Unit = { +import ForeachSinkSuite._ - var expectedEventsForPartition0 = Seq( -ForeachSinkSuite.Open(partition = 0, version = 0), -ForeachSinkSuite.Process(value = 2), -ForeachSinkSuite.Process(value = 3), -ForeachSinkSuite.Close(None) - ) - var expectedEventsForPartition1 = Seq( -ForeachSinkSuite.Open(partition = 1, version = 0), -ForeachSinkSuite.Process(value = 1), -ForeachSinkSuite.Process(value = 4), -ForeachSinkSuite.Close(None) - ) +val events = ForeachSinkSuite.allEvents() --- End diff -- This test assumed that the output would arrive in specific order after repartitioning, which isnt guaranteed. So I rewrote the test to verify the output in an order-independent way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r165543767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def addData(data: TraversableOnce[A]): Offset = { val encoded = data.toVector.map(d => encoder.toRow(d).copy()) -val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) +val plan = new LocalRelation(attributes, encoded, isStreaming = false) val ds = Dataset[A](sqlContext.sparkSession, plan) logDebug(s"Adding ds: $ds") --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r165222931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -270,16 +270,17 @@ class MicroBatchExecution( } case s: MicroBatchReader => updateStatusMessage(s"Getting offsets from $s") -reportTimeTaken("getOffset") { -// Once v1 streaming source execution is gone, we can refactor this away. -// For now, we set the range here to get the source to infer the available end offset, -// get that offset, and then set the range again when we later execute. -s.setOffsetRange( - toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), - Optional.empty()) - - (s, Some(s.getEndOffset)) +reportTimeTaken("setOffsetRange") { --- End diff -- I agree that the old metric names don't make much sense anymore, but I worry about changing external-facing behavior as part of an API migration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r165522181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def addData(data: TraversableOnce[A]): Offset = { val encoded = data.toVector.map(d => encoder.toRow(d).copy()) -val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) +val plan = new LocalRelation(attributes, encoded, isStreaming = false) val ds = Dataset[A](sqlContext.sparkSession, plan) logDebug(s"Adding ds: $ds") --- End diff -- Do we still need to store the batches as datasets, now that we're just collect()ing them back out in createDataReaderFactories()? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20445#discussion_r164933558 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala --- @@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._ trait DataSourceReaderHolder { /** - * The full output of the data source reader, without column pruning. + * The output of the data source reader, without column pruning. */ - def fullOutput: Seq[AttributeReference] --- End diff -- @cloud-fan This fixes the bug I spoke to you offline about. The target of this PR is only master, not 2.3. So if you want to have this fix in 2.3.0, please make a separate PR accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20445: [SPARK-23092][SQL] Migrate MemoryStream to DataSo...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/20445 [SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs ## What changes were proposed in this pull request? This PR migrates the MemoryStream to DataSourceV2 APIs. It fixes a few things along the way. 1. Fixed bug in DataSourceV2ScanExec that prevents it from being canonicalized, required for some tests to pass (StreamingDeduplicateSuite) 2. Changed the reported keys in StreamingQueryProgress.durationMs. - "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking that makese more sense. Unit tests changed accordingly. ## How was this patch tested? Existing unit tests, few updated unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-23092 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20445.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20445 commit 7c09b376eef6a4e6c118c78ad9459cb55e59e67f Author: Burak YavuzDate: 2018-01-11T16:44:19Z save for so far commit 78c50f860aa13f569669f4ad77f4325d80085c8b Author: Burak Yavuz Date: 2018-01-12T18:27:49Z Save so far commit 2777b5b38596a1fb68bcf8ee928aec1a58dc372c Author: Burak Yavuz Date: 2018-01-13T01:43:03Z save so far commit 50a541b5890f328a655a7ef1fca4f8480b9a35f0 Author: Burak Yavuz Date: 2018-01-16T19:14:08Z Compiles and I think also runs correctly commit fd61724c6afcab5831fe8c602ad134d0c473184b Author: Burak Yavuz Date: 2018-01-16T19:25:39Z save commit 7a0b564bd0c74525ebcea55b31f9658b1c2f0e12 Author: Burak Yavuz Date: 2018-01-16T19:28:31Z fix merge conflicts commit a81c2ecdafd54a2c5bfb07c6f1f53546eaa96c7c Author: Burak Yavuz Date: 2018-01-16T22:26:28Z fix hive commit 1a4f4108118d976857778916b18499b4e0bf140c Author: Tathagata Das Date: 2018-01-27T01:11:01Z Undo changes to HiveSessionStateBuilder.scala commit 083e93c26fd2d1e8c4c738b251a27724115a0001 Author: Tathagata Das Date: 2018-01-27T01:11:06Z Merge remote-tracking branch 'apache-github/master' into HEAD commit a817c8d40e4ecaf5e4e0c46f43313c5cceeec54e Author: Tathagata Das Date: 2018-01-29T22:27:22Z Fixed the setOffsetRange bug commit 35b8854ae466e0313ff926cc1efb8c423d3eefea Author: Tathagata Das Date: 2018-01-30T20:42:56Z Fixed DataSourceV2ScanExec canonicalization bug commit e66d809fe501b19b923a88d1b4cb9df69b4ae329 Author: Tathagata Das Date: 2018-01-31T00:57:59Z Fixed metrics reported by MicroBatchExecution --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org