[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21152 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184322699 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- Got it. Looks like we could reduce the range and list out literals. Will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184321762 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) --- End diff -- Thanks for the pointer. I started from `(0 to 5)` but spark Scala style guide mentions avoiding infix notation so a bit puzzled (I was not sure `to` is an operator). Will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184316769 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- I'm in favor of @jose-torres 's comment. I'd prefer to list out all the results if it is short. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184315510 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) --- End diff -- Nit: the changes seems unnecessary. Also in Spark we usually use style `(1 to 5): _*` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184236837 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().where('value > 5) --- End diff -- @jose-torres What do you think about this? Would it be better to have tests for untyped and typed? Code duplication is not that huge since I guess logic for verification can be reused for every test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184213878 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- @jose-torres Yeah my intention is ensuring Spark operations work same as Scala collection methods, but sure enumerating is also OK since we all know about the result easily. Are you in favor of enumerating literals we already know instead of calculating for all the tests? Or just only this line? Just would like to apply the approach consistently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184121867 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- nit: I'd just write out the literals we're expecting rather than duplicating the multiplication, it's about the same amount of code either way --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184006570 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().where('value > 5) --- End diff -- I intended to use untyped filter because of SPARK-24061. Once #21136 is merged we could change this, but not sure we want to have both untyped and typed for every tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21152 [SPARK-23688][SS] Refactor tests away from rate source ## What changes were proposed in this pull request? Replace rate source with memory source in continuous mode. Keep using "rate" source if the tests intend to put data periodically in background, or need to write short source name, since "memory" doesn't have provider for source. ## How was this patch tested? Ran relevant test suite from IDE. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-23688 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21152 commit 5aac856b3ef0118d174f016fc6a476f0facf174b Author: Jungtaek Lim Date: 2018-04-25T09:46:30Z [SPARK-23688][SS] Refactor tests away from rate source * replace rate source with memory source in continous mode --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org