[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21152


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184322699
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

Got it. Looks like we could reduce the range and list out literals. Will 
update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184321762
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
--- End diff --

Thanks for the pointer. I started from `(0 to 5)` but spark Scala style 
guide mentions avoiding infix notation so a bit puzzled (I was not sure `to` is 
an operator). Will update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184316769
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

I'm in favor of @jose-torres 's comment. I'd prefer to list out all the 
results if it is short.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184315510
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
--- End diff --

Nit: the changes seems unnecessary. Also in Spark we usually use style `(1 
to 5): _*` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184236837
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

@jose-torres What do you think about this? Would it be better to have tests 
for untyped and typed? Code duplication is not that huge since I guess logic 
for verification can be reused for every test.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184213878
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

@jose-torres 
Yeah my intention is ensuring Spark operations work same as Scala 
collection methods, but sure enumerating is also OK since we all know about the 
result easily. 
Are you in favor of enumerating literals we already know instead of 
calculating for all the tests? Or just only this line? Just would like to apply 
the approach consistently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184121867
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

nit: I'd just write out the literals we're expecting rather than 
duplicating the multiplication, it's about the same amount of code either way


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184006570
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

I intended to use untyped filter because of SPARK-24061. Once #21136 is 
merged we could change this, but not sure we want to have both untyped and 
typed for every tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21152

[SPARK-23688][SS] Refactor tests away from rate source

## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode. Keep using 
"rate" source if the tests intend to put data periodically in background, or 
need to write short source name, since "memory" doesn't have provider for 
source.

## How was this patch tested?

Ran relevant test suite from IDE.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-23688

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21152


commit 5aac856b3ef0118d174f016fc6a476f0facf174b
Author: Jungtaek Lim 
Date:   2018-04-25T09:46:30Z

[SPARK-23688][SS] Refactor tests away from rate source

* replace rate source with memory source in continous mode




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org