[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-07-06 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r126029620
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("recover from a Spark v2.1 checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def prepareMemoryStream(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+prepareMemoryStream()
+withTempDir { dir =>
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
--- End diff --

https://github.com/apache/spark/pull/18503#discussion_r126028838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106759443
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -256,6 +259,15 @@ class StreamExecution(
   updateStatusMessage("Initializing sources")
   // force initialization of the logical plan so that the sources can 
be created
   logicalPlan
+
+  // Isolated spark session to run the batches with.
+  val sparkSessionToRunBatches = sparkSession.cloneSession()
+  // Adaptive execution can change num shuffle partitions, disallow
+  
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
+  offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, 
batchTimestampMs = 0,
--- End diff --

Yeah, this should be kept. It should use the conf in the cloned session.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106757213
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -256,6 +259,15 @@ class StreamExecution(
   updateStatusMessage("Initializing sources")
   // force initialization of the logical plan so that the sources can 
be created
   logicalPlan
+
+  // Isolated spark session to run the batches with.
+  val sparkSessionToRunBatches = sparkSession.cloneSession()
+  // Adaptive execution can change num shuffle partitions, disallow
+  
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
+  offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, 
batchTimestampMs = 0,
--- End diff --

nit: remove line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106724958
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -29,12 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with 
SharedSQLContext {
   case class StringOffset(override val json: String) extends Offset
 
   test("OffsetSeqMetadata - deserialization") {
-assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
-assert(OffsetSeqMetadata(1, 0) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
-assert(OffsetSeqMetadata(0, 2) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
-assert(
-  OffsetSeqMetadata(1, 2) ===
-
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+def getConfWith(shufflePartitions: Int): Map[String, String] = {
+  Map(key -> shufflePartitions.toString)
+}
+
+// None set
+assert(OffsetSeqMetadata(0, 0, Map.empty) === 
OffsetSeqMetadata("""{}"""))
+
+// One set
+assert(OffsetSeqMetadata(1, 0, Map.empty) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+assert(OffsetSeqMetadata(0, 2, Map.empty) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+  OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+// Two set
+assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+  OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+// All set
+assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+  
OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": 
{"$key":3}}"""))
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106724948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -549,9 +581,15 @@ class StreamExecution(
   cd.dataType, cd.timeZoneId)
 }
 
+// Reset confs to disallow change in number of partitions
--- End diff --

Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106709281
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -549,9 +581,15 @@ class StreamExecution(
   cd.dataType, cd.timeZoneId)
 }
 
+// Reset confs to disallow change in number of partitions
--- End diff --

Why need to set the confs for every batch? You can set it after recovering 
`offsetSeqMetadata`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106709791
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -29,12 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with 
SharedSQLContext {
   case class StringOffset(override val json: String) extends Offset
 
   test("OffsetSeqMetadata - deserialization") {
-assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
-assert(OffsetSeqMetadata(1, 0) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
-assert(OffsetSeqMetadata(0, 2) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
-assert(
-  OffsetSeqMetadata(1, 2) ===
-
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+def getConfWith(shufflePartitions: Int): Map[String, String] = {
+  Map(key -> shufflePartitions.toString)
+}
+
+// None set
+assert(OffsetSeqMetadata(0, 0, Map.empty) === 
OffsetSeqMetadata("""{}"""))
+
+// One set
+assert(OffsetSeqMetadata(1, 0, Map.empty) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+assert(OffsetSeqMetadata(0, 2, Map.empty) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+  OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+// Two set
+assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+  OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+// All set
+assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+  
OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": 
{"$key":3}}"""))
--- End diff --

nit: could you add a test to verify that unknown fields don't break the 
serialization? Such as
```Scala
assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
  OffsetSeqMetadata(
s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": 
{"$key":3}},"unknown":1"""))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106285230
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
+  Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
shufflePartitionsSparkSession.toString))
+  } else {
+val metadata = nextOffsets.metadata.get
+val shufflePartitionsToUse = 
metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
+  // For backward compatibility, if # partitions was not 
recorded in the offset log,
+  // then ensure it is not missing. The new value is picked up 
from the conf.
+  logDebug("Number of shuffle partitions from previous run not 
found in checkpoint. "
--- End diff --

Changed to log warning.
Rechecked the semantics, it works as expected and warning only printed at 
time of first upgrade.
Once we restart query from a v2.1 checkpoint and then stop it, any new 
offsets written out will contain num shuffle partitions. Any future restarts 
will read these new offsets in 
`StreamExecution.populateStartOffsets->offsetLog.getLatest` and pick up the 
recorded num shuffle partitions.
Useful to note for future reference that we do not change the old offset 
files to contain num shuffle partitions, the semantics are correct because of 
call to `offsetLog.getLatest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
--- End diff --

I see, removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106266808
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269742
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269427
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269722
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106266038
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Since the number of partitions is greater than 10, should throw 
exception.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+var streamingQuery: StreamingQuery = null
+try {
+  intercept[StreamingQueryException] {
--- End diff --

https://github.com/apache/spark/pull/17216#discussion_r106033678


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106267528
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,25 +464,28 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
-  // Current batch timestamp in milliseconds
-  offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+  var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
   // Update the eventTime watermark if we find one in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
   case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
 e.eventTimeStats.value.max - e.delayMs
 }.headOption.foreach { newWatermarkMs =>
-  if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+  if (newWatermarkMs > batchWatermarkMs) {
 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+batchWatermarkMs = newWatermarkMs
   } else {
 logDebug(
   s"Event time didn't move: $newWatermarkMs < " +
-s"${offsetSeqMetadata.batchWatermarkMs}")
+s"$batchWatermarkMs")
   }
 }
   }
+  offsetSeqMetadata = OffsetSeqMetadata(
--- End diff --

Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106059496
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
--- End diff --

nit: `withTempDir { dir => `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106059467
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
--- End diff --

nit: 
`withTempDir { dir => `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106059434
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Since the number of partitions is greater than 10, should throw 
exception.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+var streamingQuery: StreamingQuery = null
+try {
+  intercept[StreamingQueryException] {
--- End diff --

what is the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106059371
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
--- End diff --

This is not directly tied to SPARK-19873. remove SPARK-19873. just "recover 
from a Spark v2.1 checkpoint"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106059096
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
--- End diff --

nit: init -> prepareMemoryStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106058976
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,25 +464,28 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
-  // Current batch timestamp in milliseconds
-  offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+  var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
   // Update the eventTime watermark if we find one in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
   case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
 e.eventTimeStats.value.max - e.delayMs
 }.headOption.foreach { newWatermarkMs =>
-  if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+  if (newWatermarkMs > batchWatermarkMs) {
 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+batchWatermarkMs = newWatermarkMs
   } else {
 logDebug(
   s"Event time didn't move: $newWatermarkMs < " +
-s"${offsetSeqMetadata.batchWatermarkMs}")
+s"$batchWatermarkMs")
   }
 }
   }
+  offsetSeqMetadata = OffsetSeqMetadata(
--- End diff --

You can make this `offsetSeqMetadata.copy(batchWatermarkMs= 
batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106058765
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
+  Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
shufflePartitionsSparkSession.toString))
+  } else {
+val metadata = nextOffsets.metadata.get
+val shufflePartitionsToUse = 
metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
+  // For backward compatibility, if # partitions was not 
recorded in the offset log,
+  // then ensure it is not missing. The new value is picked up 
from the conf.
+  logDebug("Number of shuffle partitions from previous run not 
found in checkpoint. "
--- End diff --

Make this a log warning. So that we can debug. And it should be printed 
only once, at the time of upgrading for the first time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106057080
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
--- End diff --

nit: can you make this call with named params
 ```
OffsetSeqMetadata(
   batchWatermarkMs = 0,  
   ...
)
``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106043061
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105792431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -70,13 +69,16 @@ object OffsetSeq {
  * bound the lateness of data that will processed. Time unit: milliseconds
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches, e.g. 
number of shuffle partitions.
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
--- End diff --

Changed to vals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106033678
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
+
+// If the number of partitions is greater, should throw exception.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
--- End diff --

Seems okay to me. Underlying cause is `FileNotFoundException`. Error 
message indicates _Error reading delta file 
/Users/path/to/checkpoint/state/[operator]/[partition]/[batch].delta_
> [info] - SPARK-19873: backward compatibility - recover with wrong num 
shuffle partitions *** FAILED *** (12 seconds, 98 milliseconds)
[info]   org.apache.spark.sql.streaming.StreamingQueryException: Query 
badQuery [id = dddc5e7f-1e71-454c-8362-de18fb5a, runId = 
b2960c74-257a-4eb1-b242-61d13e20655f] terminated with exception: Job aborted 
due to stage failure: Task 10 in stage 1.0 failed 1 times, most recent failure: 
Lost task 10.0 in stage 1.0 (TID 11, localhost, executor driver): 
java.lang.IllegalStateException: Error reading delta file 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta
 of HDFSStateStoreProvider[id = (op=0, part=10), dir = 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10]:
 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta
 does not exist
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:384)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:336)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:333)
[info]  at scala.Option.getOrElse(Option.scala:121)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:333)
[info]  at 

[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106043087
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
--- End diff --

Added more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106042959
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
--- End diff --

Added `try .. finally`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105781913
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
--- End diff --

you dont seem to stop the query? would be good put a `try .. finally` 
within the `withSQLConf` to stop the query. otherwise can lead to cascaded 
failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105781762
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
--- End diff --

can you add a comment saying that start the query with existing checkpoints 
generated by 2.1 which do not have shuffle partitions recorded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105781560
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
--- End diff --

its not clear that this would actually re-execute a batch. unless a batch 
is executed, this does not test anything. so how about you add more data after 
processAllAvailable(), to ensure that at least one batch is actually executed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105781135
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
+
+// If the number of partitions is greater, should throw exception.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
--- End diff --

can you check whether the returned message is useful?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105779702
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -70,13 +69,16 @@ object OffsetSeq {
  * bound the lateness of data that will processed. Time unit: milliseconds
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches, e.g. 
number of shuffle partitions.
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
--- End diff --

Do you know why we have this as var?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105312919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

@lw-lin Hi Liwei!
Thanks for letting me know, we will not be updating the log version number 
since backward and forward compatibility is preserved by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105310100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

@zsxwing Changed to a map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105310054
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +382,20 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+val numShufflePartitionsFromConf = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = nextOffsets
+  .metadata
+  .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))
+
+/*
+ * For backwards compatibility, if # partitions was not recorded 
in the offset log, then
+ * ensure it is non-zero. The new value is picked up from the conf.
+ */
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105081023
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

Hi @kunalkhamar, in case you would update `OffsetSeq`'s log version number, 
the work being done in #17070 might be helpful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105069281
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +382,20 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+val numShufflePartitionsFromConf = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = nextOffsets
+  .metadata
+  .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))
+
+/*
+ * For backwards compatibility, if # partitions was not recorded 
in the offset log, then
+ * ensure it is non-zero. The new value is picked up from the conf.
+ */
--- End diff --

for inline comment with the code, use // and not /* .. */.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105050897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

It's better to use `conf: Map[String, String]` here because we probably 
will add more confs to this class in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17216

[SPARK-19873][SS] Record num shuffle partitions in offset log and enforce 
in next batch.

## What changes were proposed in this pull request?

If the user changes the shuffle partition number between batches, Streaming 
aggregation will fail.

Here are some possible cases:

- Change "spark.sql.shuffle.partitions"
- Use "repartition" and change the partition number in codes
- RangePartitioner doesn't generate deterministic partitions. Right now 
it's safe as we disallow sort before aggregation. Not sure if we will add some 
operators using RangePartitioner in future.

## How was this patch tested?

Unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark num-partitions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17216


commit 12f5fd30229e441355a05290ed124263c1429acc
Author: Kunal Khamar 
Date:   2017-03-08T21:29:02Z

Record num shuffle partitions in offset log and enforce in next batch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org