Repository: spark Updated Branches: refs/heads/master 4f7ec3a31 -> cce25b360
[SPARK-21565][SS] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix Author: Jose Torres <joseph-tor...@databricks.com> Closes #18840 from joseph-torres/SPARK-21565. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cce25b36 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cce25b36 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cce25b36 Branch: refs/heads/master Commit: cce25b360ee9e39d9510134c73a1761475eaf4ac Parents: 4f7ec3a Author: Jose Torres <joseph-tor...@databricks.com> Authored: Mon Aug 7 12:27:16 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon Aug 7 12:27:16 2017 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 3 ++- .../sql/streaming/EventTimeWatermarkSuite.scala | 28 ++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5711262..1528e7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -628,7 +628,8 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case a: Attribute if replacementMap.contains(a) => + replacementMap(a).withMetadata(a.metadata) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 552911f..4f19fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { + withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) + .toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) + .json(dir.getCanonicalPath) + + val groupEvents = input + .withWatermark("eventTime", "2 seconds") + .groupBy("symbol", "eventTime") + .agg(count("price") as 'count) + .select("symbol", "eventTime", "count") + val q = groupEvents.writeStream + .outputMode("append") + .format("console") + .start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org