Repository: spark Updated Branches: refs/heads/master efef55388 -> d0bc3ed67
[SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query ## What changes were proposed in this pull request? `Uuid`'s results depend on random seed given during analysis. Thus under streaming query, we will have the same uuids in each execution. This seems to be incorrect for streaming query execution. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #21854 from viirya/uuid_in_streaming. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0bc3ed6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0bc3ed6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0bc3ed6 Branch: refs/heads/master Commit: d0bc3ed6797e0c06f688b7b2ef6c26282a25b175 Parents: efef553 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Aug 2 15:35:46 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Thu Aug 2 15:35:46 2018 -0700 ---------------------------------------------------------------------- .../streaming/IncrementalExecution.scala | 8 ++++++- .../sql/streaming/StreamingQuerySuite.scala | 22 +++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 6ae7f28..e9ffe12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import scala.util.Random + import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp +import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule @@ -73,10 +75,14 @@ class IncrementalExecution( * with the desired literal */ override lazy val optimizedPlan: LogicalPlan = { + val random = new Random() + sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => logInfo(s"Current batch timestamp = $timestamp") ts.toLiteral + // SPARK-24896: Set the seed for random number generation in Uuid expressions. + case _: Uuid => Uuid(Some(random.nextLong())) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d0bc3ed6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 78199b0..f37f368 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -21,6 +21,8 @@ import java.{util => ju} import java.util.Optional import java.util.concurrent.CountDownLatch +import scala.collection.mutable + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter @@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter import org.apache.spark.sql.functions._ @@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckLastBatch(("A", 1))) } + test("Uuid in streaming query should not produce same uuids in each execution") { + val uuids = mutable.ArrayBuffer[String]() + def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] => + rows.foreach(r => uuids += r.getString(0)) + } + + val stream = MemoryStream[Int] + val df = stream.toDF().select(new Column(Uuid())) + testStream(df)( + AddData(stream, 1), + CheckAnswer(collectUuid), + AddData(stream, 2), + CheckAnswer(collectUuid) + ) + assert(uuids.distinct.size == 2) + } + test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " + "should not fail") { val df = spark.readStream.format("rate").load() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org