[GitHub] spark issue #23060: [SPARK-26092][SS]Use CheckpointFileManager to write the ...

2018-11-16 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/23060
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223479786
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
+
+  - Changes in projections with different output schema are conditionally 
allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` 
is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
+
+- *Changes in stateful operations*: Some operations in streaming queries 
need to maintain
+  state data in order to continuously update the result. Structured 
Streaming automatically checkpoints
+  the state data to fault-tolerant storage (for example, DBFS, AWS S3, 
Azure Blob storage) and restores it after restart.
--- End diff --

replaced with HDFS


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223479414
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

2018-10-08 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22627
  
@holdenk yeah, i intend to backport this to 2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223456294
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
--- End diff --

Right. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223456079
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
--- End diff --

Yes. I missed a few, and I want to fix them all. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

2018-10-04 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22627
  
@zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/22627

[SPARK-25639] [DOCS] Added docs for foreachBatch, python foreach and 
multiple watermarks

## What changes were proposed in this pull request?

Added
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between 
restarts.

## How was this patch tested?
No tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-25639

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22627.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22627


commit f61c13ef0d4711a04b2774934641f7a4ac690165
Author: Tathagata Das 
Date:   2018-10-04T10:33:47Z

Added docs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22507: [SPARK-25495][SS]FetchedData.reset should reset all fiel...

2018-09-25 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22507
  
LGTM. Thanks for the explanation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220061053
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

Make it clear that you " want to send more message *before* the tasks of 
the current batch start reading the current batch data, so that the executors 


also, I am not entirely sure how it causes `fetchedData.reset()` thus 
creating the issue. Are you sure this fails without your fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220059919
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

what do you mean by drop them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22476: [SPARK-24157][SS][FOLLOWUP] Rename to spark.sql.streamin...

2018-09-19 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22476
  
LGTM.
Please make sure to merge it to 2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

2018-09-11 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22386
  
LGTM. Just one super nit. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

2018-09-11 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22386#discussion_r216781599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
 ---
@@ -74,9 +75,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
 
 // If we're in continuous processing mode, we should get the store 
version for the current
 // epoch rather than the one at planning time.
-val currentVersion = EpochTracker.getCurrentEpoch match {
-  case None => storeVersion
-  case Some(value) => value
+val isContinuous = 
Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
+  .map(_.toBoolean)
+val currentVersion = if (isContinuous.contains(true)) {
--- End diff --

super nit: this looks weird. rather i would do change the previous line 
`val isContinuous = ... .map(_.toBoolean).getOrElse(false)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction tests

2018-08-30 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22293
  
I was afraid that this would be flaky. Glad you found a solution quickly. 
just one comment to improve code readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22293: [SPARK-25288][Tests]Fix flaky Kafka transaction t...

2018-08-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22293#discussion_r214208589
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -652,62 +654,67 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 }
 
+val topicPartition = new TopicPartition(topic, 0)
 // The message values are the same as their offsets to make the test 
easy to follow
 testUtils.withTranscationalProducer { producer =>
   testStream(mapped)(
 StartStream(ProcessingTime(100), clock),
 waitUntilBatchProcessed,
 CheckAnswer(),
-WithOffsetSync(topic) { () =>
+WithOffsetSync(topicPartition) { () =>
--- End diff --

Its weird to have a hanging "5" in the thunk. Rather take the expected 
offset as part of the with. That is, `WithOffsetSync(topicPartition, 
expectedOffset = 5) {` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22042: [SPARK-25005][SS]Support non-consecutive offsets for Kaf...

2018-08-28 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22042
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22230: [SPARK-25214][SS][FOLLOWUP]Fix the issue that Kafka v2 s...

2018-08-24 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22230
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212706003
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
  

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212706908
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
  

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212706859
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
  

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-24 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212526645
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
  

[GitHub] spark pull request #22207: [SPARK-25214][SS]Fix the issue that Kafka v2 sour...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22207#discussion_r212526373
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to 
test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 
30 seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be 
removed. To make sure a test
+ * does see missing offsets, you can check the earliest offset in 
`eventually` and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+new TestSparkSession(new SparkContext("local[2,3]", 
"test-sql-context", sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils {
+  override def brokerConfiguration: Properties = {
+val props = super.brokerConfiguration
+// Try to make Kafka clean up messages as fast as possible. 
However, there is a hard-code
+// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so 
this test should run at
+// least 30 seconds.
+props.put("log.cleaner.backoff.ms", "100")
+// The size of RecordBatch V2 increases to support transactional 
write.
+props.put("log.segment.bytes", "70")
+props.put("log.retention.bytes", "40")
+props.put("log.retention.check.interval.ms", "100")
+props.put("delete.retention.ms", "10")
+props.put("log.flush.scheduler.interval.ms", "10")
+props
+  }
+}
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a 
batch query.
+   * @param writeToTable the function to write the specified [[DataFrame]] 
to the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+  testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => 
Unit): Unit = {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
  

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212507190
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
   s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithKafkaProducer {
+def apply(
+topic: String,
+producer: KafkaProducer[String, String])(
--- End diff --

Ping on this comment. Maybe you missed this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212522664
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
   s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithKafkaProducer {
--- End diff --

nit: This is not creating a KafkaProducer .. as most `With***` methods. The 
point of this is to force synchronization of the consumer. So maybe rename it 
to `WithOffsetSync { ... }`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212521083
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -239,56 +335,74 @@ private[kafka010] case class InternalKafkaConsumer(
   }
 
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
-   * or null.
+   * Get the fetched record for the given offset if available.
+   *
+   * If the record is invisible (either a  transaction message, or an 
aborted message when the
+   * consumer's `isolation.level` is `read_committed`), it will return a 
`FetchedRecord` with the
+   * next offset to fetch.
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is true`, it will
+   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
+   * method will return `null` if the next available record is within 
[offset, untilOffset).
*
* @throws OffsetOutOfRangeException if `offset` is out of range
* @throws TimeoutException if cannot fetch the record in 
`pollTimeoutMs` milliseconds.
*/
-  private def fetchData(
+  private def fetchRecord(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
-  // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
-}
-
-if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
-  // - `offset` is out of range so that Kafka returns nothing. Just 
throw
-  // `OffsetOutOfRangeException` to let the caller handle it.
-  // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-  val range = getAvailableOffsetRange()
-  if (offset < range.earliest || offset >= range.latest) {
-throw new OffsetOutOfRangeException(
-  Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != fetchedData.nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
+  // Fetch records from Kafka and update `fetchedData`.
+  fetchData(offset, pollTimeoutMs)
+} else if (!fetchedData.hasNext) { // The last pre-fetched data has 
been drained.
+  if (offset < fetchedData.offsetAfterPoll) {
+// Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
+// the next call to start from `fetchedData.offsetAfterPoll`.
+fetchedData.reset()
+return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
   } else {
-throw new TimeoutException(
-  s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+// Fetch records from Kafka and update `fetchedData`.
+fetchData(offset, pollTimeoutMs)
   }
+}
+
+if (!fetchedData.hasNext) {
+  // When we reach here, we have already tried to poll from Kafka. As 
`fetchedData` is still
+  // empty, all messages in [offset, fetchedData.offsetAfterPoll) are 
invisible. Return a
+  // record to ask the next call to start from 
`fetchedData.offsetAfterPoll`.
+  assert(offset <= fetchedData.offsetAfterPoll,
+s"seek to $offset and poll but the offset was reset to 
${fetchedData.offsetAfterPoll}")
+  fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
 } else {
   val record = fetchedData.next()
-  nextOffsetInFetchedData = record.offset + 1
   // In general, Kafka uses the specified offset as the start point, 
and tries to fetch the next
   // available offset. Hence we need to handle offset mismatch.
   if (record.offset > offset) {
+val range = getAvailableOffsetRange()
+if (range.earliest <= offset) {
+  // `offset` is still valid but the corresponding message is 
invisible. We should skip it
+  // and jump to `record.offset`. Here we move `fetchedData` back 
so that the next call of
+  // `fetchRecord` can just return `record` directly.
+  fetchedData.previous()
+  return fet

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212522432
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -161,6 +161,22 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
   s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithKafkaProducer {
+def apply(
+topic: String,
+producer: KafkaProducer[String, String])(
+func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
--- End diff --

nit: AssertOnQuery -> StreamAction


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r212504622
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 ---
@@ -331,6 +331,7 @@ private[kafka010] case class 
KafkaMicroBatchPartitionReader(
 offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
 
   private val rangeToRead = resolveRange(offsetRange)
+
--- End diff --

unnecessary 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22191: [SPARK-25204][SS] Fix race in rate source test.

2018-08-23 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22191
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22182: [SPARK-25184][SS] Fixed race condition in StreamE...

2018-08-21 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/22182

[SPARK-25184][SS] Fixed race condition in StreamExecution that caused flaky 
test in FlatMapGroupsWithState

## What changes were proposed in this pull request?

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers 
and then generates progress events.
- The test thread that generates some input data, checked the answer and 
then verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for 
progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on 
`awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on 
the in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b 
-> 2c -> 1c.
In other words, the progress event may be generated after the test tries to 
verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, 
that is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a 
large value.

This latter is to ensure that test thread for keeps waiting on 
`awaitProgressLockCondition`until the MicroBatchExecution thread explicitly 
signals it. With the existing small timeout of 100ms the following sequence can 
occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting on `awaitProgressLockCondition` accidentally times 
out after 100 ms, finds that the committed offsets have been updated, therefore 
returns from `awaitOffset` and moves on to the progress event tests.
 - MicroBatchExecution thread then generates progress event and signals. 
But the test thread has already attempted to verify the event and failed.

By increasing the timeout to large (e.g., `streamingTimeoutMs = 60 
seconds`, similar to `awaitInitialization`), this above type of race condition 
is also avoided.

## How was this patch tested?
Ran locally many times.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-25184

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22182


commit 319990ff60ad7b6fad6fd0cea5cada0b22e3f3c9
Author: Tathagata Das 
Date:   2018-08-22T04:44:59Z

[SC-12136][SS][HOTFIX] Fixed race condition in StreamExecution that caused 
flaky test in FlatMapGroupsWithState

The race condition that caused test failure is between 2 threads.
- The MicrobatchExecution thread that processes inputs to produce answers 
and then generates progress events.
- The test thread that generates some input data, checked the answer and 
then verified the query generated progress event.

The synchronization structure between these threads is as follows
1. MicrobatchExecution thread, in every batch, does the following in order.
   a. Processes batch input to generate answer.
   b. Signals `awaitProgressLockCondition` to wake up threads waiting for 
progress using `awaitOffset`
   c. Generates progress event

2. Test execution thread
   a. Calls `awaitOffset` to wait for progress, which waits on 
`awaitProgressLockCondition`.
   b. As soon as `awaitProgressLockCondition` is signaled, it would move on 
the in the test to check answer.
  c. Finally, it would verify the last generated progress event.

What can happen is the following sequence of events: 2a -> 1a -> 1b -> 2b 
-> 2c -> 1c.
In other words, the progress event may be generated after the test tries to 
verify it.

The solution has two steps.
1. Signal the waiting thread after the progress event has been generated, 
that is, after `finishTrigger()`.
2. Increase the timeout of `awaitProgressLockCondition.await(100 ms)` to a 
large value.

This latter is to ensure that test thread for keeps waiting on 
`awaitProgressLockCondition`until the MicroBatchExecution thread explicitly 
signals it. With the existing small timeout of 100ms the following sequence can 
occur.
 - MicroBatchExecution thread updates committed offsets
 - Test thread waiting 

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211805733
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages and abort the transaction. They should not be 
read.
+  producer.beginTransaction()
+  (6 to 10).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.abortTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages again. The consumer should skip the above 
aborted messages and read
+  // them.
+  producer.beginTransaction()
+  (11 to 15).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211801632
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -346,11 +437,40 @@ private[kafka010] case class InternalKafkaConsumer(
 consumer.seek(topicPartition, offset)
   }
 
-  private def poll(pollTimeoutMs: Long): Unit = {
+  /**
+   * Poll messages from Kafka starting from `offset` and set `fetchedData` 
and `offsetAfterPoll`.
+   * `fetchedData` may be empty if the Kafka fetches some messages but all 
of them are not visible
+   * messages (either transaction messages, or aborted messages when 
`isolation.level` is
+   * `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
--- End diff --

Maybe rename this method to be consistent with that it does  fetch 
data. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211802112
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != fetchedData.nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
-}
-
-if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
-  // - `offset` is out of range so that Kafka returns nothing. Just 
throw
-  // `OffsetOutOfRangeException` to let the caller handle it.
-  // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-  val range = getAvailableOffsetRange()
-  if (offset < range.earliest || offset >= range.latest) {
-throw new OffsetOutOfRangeException(
-  Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+  poll(offset, pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < fetchedData.offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
--- End diff --

"skip them" is confusing. What does it mean to skip? Why are we still 
returning something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211801676
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != fetchedData.nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
-}
-
-if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
-  // - `offset` is out of range so that Kafka returns nothing. Just 
throw
-  // `OffsetOutOfRangeException` to let the caller handle it.
-  // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-  val range = getAvailableOffsetRange()
-  if (offset < range.earliest || offset >= range.latest) {
-throw new OffsetOutOfRangeException(
-  Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+  poll(offset, pollTimeoutMs)
--- End diff --

comment that this method updates `fetchedData`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211801968
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != fetchedData.nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
-}
-
-if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
-  // - `offset` is out of range so that Kafka returns nothing. Just 
throw
-  // `OffsetOutOfRangeException` to let the caller handle it.
-  // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-  val range = getAvailableOffsetRange()
-  if (offset < range.earliest || offset >= range.latest) {
-throw new OffsetOutOfRangeException(
-  Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+  poll(offset, pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
--- End diff --

nit: I was confused with whether the above comment was for the `else if` 
above it or for the `if` below it. Maybe inline it with the `else if`. Or leave 
a line after it, before the `if` below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211795985
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -80,6 +90,72 @@ private[kafka010] case class InternalKafkaConsumer(
 kafkaParams: ju.Map[String, Object]) extends Logging {
   import InternalKafkaConsumer._
 
+  /**
+   * The internal object to store the fetched data from Kafka consumer and 
the next offset to poll.
+   *
+   * @param records the pre-fetched Kafka records.
+   * @param nextOffsetInFetchedData the next offset in `records`. We use 
this to verify if we should
+   *check if the pre-fetched data is still 
valid.
+   * @param offsetAfterPoll the Kafka offset after calling `poll`. We will 
use this offset to poll
+   *when `records` is drained.
+   */
+  private case class FetchedData(
+  private var records: ju.ListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+  var nextOffsetInFetchedData: Long,
--- End diff --

Make this public getter, private setter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211805275
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages and abort the transaction. They should not be 
read.
+  producer.beginTransaction()
+  (6 to 10).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.abortTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages again. The consumer should skip the above 
aborted messages and read
+  // them.
+  producer.beginTransaction()
+  (11 to 15).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+AdvanceManualClock(100),
+wait

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211801549
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
--- End diff --

Maybe rename this method to fetchRecord, to make it consistent with return 
type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211804454
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
--- End diff --

use `Execute` 
and comment on what this does and why we need it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211802489
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -288,7 +385,7 @@ private[kafka010] case class InternalKafkaConsumer(
 null
--- End diff --

We should not be returning null EVER when we are using 
`FetchedRecord.record = null` to signify lack of record.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211801254
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -250,33 +341,39 @@ private[kafka010] case class InternalKafkaConsumer(
   offset: Long,
   untilOffset: Long,
   pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+  failOnDataLoss: Boolean): FetchedRecord = {
+if (offset != fetchedData.nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
-  seek(offset)
-  poll(pollTimeoutMs)
-}
-
-if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
-  // - `offset` is out of range so that Kafka returns nothing. Just 
throw
-  // `OffsetOutOfRangeException` to let the caller handle it.
-  // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-  val range = getAvailableOffsetRange()
-  if (offset < range.earliest || offset >= range.latest) {
-throw new OffsetOutOfRangeException(
-  Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+  poll(offset, pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < fetchedData.offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
+fetchedData.reset()
+return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
   } else {
-throw new TimeoutException(
-  s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+poll(offset, pollTimeoutMs)
   }
+}
+
+if (!fetchedData.hasNext) {
+  assert(offset <= fetchedData.offsetAfterPoll,
--- End diff --

Add comments here on what this case means.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211804879
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
--- End diff --

Why is this `waitUntilBatchProcessed` needed? CheckAnswer waits for the 
batch to complete anyways.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211803267
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -337,6 +338,7 @@ private[kafka010] case class 
KafkaMicroBatchInputPartitionReader(
   val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
   if (record != null) {
 nextRow = converter.toUnsafeRow(record)
+nextOffset = record.offset + 1
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211805821
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages and abort the transaction. They should not be 
read.
+  producer.beginTransaction()
+  (6 to 10).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.abortTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages again. The consumer should skip the above 
aborted messages and read
+  // them.
+  producer.beginTransaction()
+  (11 to 15).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211805409
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+// This test will cover the following cases:
+// 1. the whole batch contains no data messages
+// 2. the first offset in a batch is not a committed data message
+// 3. the last offset in a batch is not a committed data message
+// 4. there is a gap in the middle of a batch
+
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 1)
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("maxOffsetsPerTrigger", 3)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  // Set a short timeout to make the test fast. When a batch contains 
no committed date
+  // messages, "poll" will wait until timeout.
+  .option("kafkaConsumer.pollTimeoutMs", 5000)
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+
+val producer = testUtils.createProducer(usingTrascation = true)
+try {
+  producer.initTransactions()
+
+  testStream(mapped)(
+StartStream(ProcessingTime(100), clock),
+waitUntilBatchProcessed,
+// 1 from smallest, 1 from middle, 8 from biggest
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages. They should be visible only after being 
committed.
+  producer.beginTransaction()
+  (1 to 5).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+// Should not see any uncommitted messages
+CheckAnswer(),
+WithKafkaProducer(topic, producer) { producer =>
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages and abort the transaction. They should not be 
read.
+  producer.beginTransaction()
+  (6 to 10).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.abortTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+WithKafkaProducer(topic, producer) { producer =>
+  // Send 5 messages again. The consumer should skip the above 
aborted messages and read
+  // them.
+  producer.beginTransaction()
+  (11 to 15).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+},
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+AdvanceManualClock(100),
+waitUntilBatchProcessed,
+CheckAnswer((1 to 5) ++ (

[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211804704
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
   s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithKafkaProducer {
+def apply(
+topic: String,
+producer: KafkaProducer[String, String])(
--- End diff --

Why pass producer when all you are doing is to pass it to the function. The 
function can do it on its own.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211803763
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -160,6 +160,23 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
   s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithKafkaProducer {
+def apply(
+topic: String,
+producer: KafkaProducer[String, String])(
+func: KafkaProducer[String, String] => Unit): AssertOnQuery = {
+  AssertOnQuery(_ => {
--- End diff --

nit: use Execute


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-21 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r211805993
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -327,6 +332,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
 props
   }
 
+  def createProducer(usingTrascation: Boolean): KafkaProducer[String, 
String] = {
--- End diff --

nit: usingTrascation -> usingTra**n**scation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
I did. Fixed the import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22175: [MINOR] Added import to fix compilation

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22175
  
Merged as compilation passed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22175: [MINOR] Added import to fix compilation

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/22175
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22175: [MINOR] Added import to fix compilation

2018-08-21 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/22175

[MINOR] Added import to fix compilation

## What changes were proposed in this pull request?

Two back to PRs implicitly conflicted by one PR removing an existing import 
that the other PR needed. This did not cause explicit conflict as the import 
already existed, but not used. 
 
## How was this patch tested?
It compiles!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark fix-build

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22175


commit d540d5af2b6ca6f0b09ebe1a36da640c3e48aea8
Author: Tathagata Das 
Date:   2018-08-21T23:15:55Z

Added import to fix compilation




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
Unfortunately this PR broke the master build. Looks like some import that 
probably got removed in the other PR I merged, which didnt create any direct 
conflict.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
LGTM. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
@HeartSaVioR I think I agree with a second approach that you suggested. So 
`memoryUsedBytes` => `size for total memory usage of loaded versions` and
`customMetric` => `size for memory usage of latest version`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21733
  
Good point. That can be minor Pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21733
  
LGTM. Will merge when tests pass. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-21 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21733
  
This looks good!! Only one comment, please don't add the .crc files. They 
are useless and adds unnecessarily clutter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r210422755
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,22 +31,21 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
-/**
- * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
- * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
- */
-private[kafka010] class MissingOffsetException(
-val offset: Long,
-val nextOffsetToFetch: Long) extends Exception(
-  s"Offset $offset is missing. The next offset to fetch is: 
$nextOffsetToFetch")
-
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is `false`, it will
+   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `true`, this
--- End diff --

Will we throw an exception even when its a control message and there is no 
real data loss?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r210423180
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -91,6 +90,17 @@ private[kafka010] case class InternalKafkaConsumer(
 kafkaParams: ju.Map[String, Object]) extends Logging {
   import InternalKafkaConsumer._
 
+  /**
+   * The internal object returned by the `fetchData` method. If `record` 
is empty, it means it is
+   * invisible (either a transaction message, or an aborted message when 
the consumer's
+   * `isolation.level` is `read_committed`), and the caller should use 
`nextOffsetToFetch` to fetch
+   * instead.
+   */
+  private case class FetchedRecord(
+record: Option[ConsumerRecord[Array[Byte], Array[Byte]]],
--- End diff --

Can;t we reuse the objects here. And do we need to have an Option, thus 
creating a lot of Option objects all the time?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r210985375
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer(
 consumer.seek(topicPartition, offset)
   }
 
-  private def poll(pollTimeoutMs: Long): Unit = {
+  /**
+   * Poll messages from Kafka starting from `offset` and set `fetchedData` 
and `offsetAfterPoll`.
+   * `fetchedData` may be empty if the Kafka fetches some messages but all 
of them are not visible
+   * messages (either transaction messages, or aborted messages when 
`isolation.level` is
+   * `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
+seek(offset)
 val p = consumer.poll(pollTimeoutMs)
 val r = p.records(topicPartition)
 logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
-fetchedData = r.iterator
+offsetAfterPoll = consumer.position(topicPartition)
--- End diff --

I strongly think that this should not be a var, rather a clear return 
value. we have been burnt by too many mutable vars/defs (see all the flakiness 
caused by the structured ProgressReporter) and we should consciously try to 
improve this everywhere by not having vars all over the place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-17 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r210422521
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,22 +31,21 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
-/**
- * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
- * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
- * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
- * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
- */
-private[kafka010] class MissingOffsetException(
-val offset: Long,
-val nextOffsetToFetch: Long) extends Exception(
-  s"Offset $offset is missing. The next offset to fetch is: 
$nextOffsetToFetch")
-
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is `false`, it will
--- End diff --

if failOnDataLoss is *true* then it should throw exception... isnt it?

nit: try its best



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209479417
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
   pollTimeoutMs: Long,
   failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
+resetFetchedData()
+throw new MissingOffsetException(offset, offsetAfterPoll)
--- End diff --

So MissingOffsetRange is only used to signal that some offset may be 
missing due to control messages and nothing else. And the higher function (i.e. 
`get`) just handles it by resetting the fetched offsets. Why not let this 
`fetchData` method handle the situation instead of creating a new exception 
just for this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209479551
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
 offsetRanges.zipWithIndex.map { case (o, i) => new 
KafkaSourceRDDPartition(i, o) }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.size).sum
--- End diff --

God catch. That would have never occurred to me!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209475048
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
   pollTimeoutMs: Long,
   failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
--- End diff --

Its hard to understand this condition because it hard to understand what 
offsetAfterPoll means? Does it refer to the offset that will be fetched next by 
the KafkaConsumer? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209476712
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -347,9 +391,12 @@ private[kafka010] case class InternalKafkaConsumer(
   }
 
   private def poll(pollTimeoutMs: Long): Unit = {
+offsetBeforePoll = consumer.position(topicPartition)
--- End diff --

This variable `offsetBeforePoll` seems to be only used to identify whether 
data was actually fetched in a poll and nothing else. Rather than define 
another var (there are already many that already confusing), why not just 
return a boolean from poll which is true or false depending on whether poll 
moved anything.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209473392
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,6 +31,17 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
+/**
+ * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
+ * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
+ * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
+ * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
+ */
+private[kafka010] class MissingOffsetException(
--- End diff --

nit: Is this meant to be used outside this KafkaDataConsumer class? If not, 
then maybe make it an inner class to KafkaDataConsumer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209477156
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
   pollTimeoutMs: Long,
   failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
+resetFetchedData()
+throw new MissingOffsetException(offset, offsetAfterPoll)
+  } else {
+seek(offset)
+poll(pollTimeoutMs)
+  }
 }
 
 if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
+  // We cannot fetch anything after `poll`. Three possible cases:
   // - `offset` is out of range so that Kafka returns nothing. Just 
throw
   // `OffsetOutOfRangeException` to let the caller handle it.
   // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
+  // - Fetched something but all of them are not valid date messages. 
In this case, the position
+  //   will be changed and we can use it to determine this case.
   val range = getAvailableOffsetRange()
   if (offset < range.earliest || offset >= range.latest) {
 throw new OffsetOutOfRangeException(
   Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
-  } else {
+  } else if (offsetBeforePoll == offsetAfterPoll) {
--- End diff --

Just to be clear, can this happen only if there is a timeout?? And if so 
then why push this condition and exception into the poll() method thus 
simplifying this method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209476548
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
   pollTimeoutMs: Long,
   failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
+resetFetchedData()
+throw new MissingOffsetException(offset, offsetAfterPoll)
+  } else {
+seek(offset)
+poll(pollTimeoutMs)
+  }
 }
 
 if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
+  // We cannot fetch anything after `poll`. Three possible cases:
   // - `offset` is out of range so that Kafka returns nothing. Just 
throw
   // `OffsetOutOfRangeException` to let the caller handle it.
   // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
+  // - Fetched something but all of them are not valid date messages. 
In this case, the position
--- End diff --

date => data


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209478033
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
   pollTimeoutMs: Long,
   failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-  // This is the first fetch, or the last pre-fetched data has been 
drained.
+if (offset != nextOffsetInFetchedData) {
+  // This is the first fetch, or the fetched data has been reset.
   // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
   seek(offset)
   poll(pollTimeoutMs)
+} else if (!fetchedData.hasNext) {
+  // The last pre-fetched data has been drained.
+  if (offset < offsetAfterPoll) {
+// Offsets in [offset, offsetAfterPoll) are missing. We should 
skip them.
+resetFetchedData()
+throw new MissingOffsetException(offset, offsetAfterPoll)
+  } else {
+seek(offset)
+poll(pollTimeoutMs)
+  }
 }
 
 if (!fetchedData.hasNext()) {
-  // We cannot fetch anything after `poll`. Two possible cases:
+  // We cannot fetch anything after `poll`. Three possible cases:
   // - `offset` is out of range so that Kafka returns nothing. Just 
throw
   // `OffsetOutOfRangeException` to let the caller handle it.
   // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
+  // - Fetched something but all of them are not valid date messages. 
In this case, the position
+  //   will be changed and we can use it to determine this case.
   val range = getAvailableOffsetRange()
   if (offset < range.earliest || offset >= range.latest) {
 throw new OffsetOutOfRangeException(
   Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
-  } else {
+  } else if (offsetBeforePoll == offsetAfterPoll) {
 throw new TimeoutException(
   s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+  } else {
+assert(offset <= offsetAfterPoll,
+  s"seek to $offset and poll but the offset was reset to 
$offsetAfterPoll")
+throw new MissingOffsetException(offset, offsetAfterPoll)
   }
 } else {
--- End diff --

Let's remove this else and reduce the condition nesting. The previous `if` 
statement always ends in an exception, so we can remove this else.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209473432
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -31,6 +31,17 @@ import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.util.UninterruptibleThread
 
+/**
+ * An exception to indicate there is a missing offset in the records 
returned by Kafka consumer.
+ * This means it's either a transaction (commit or abort) marker, or an 
aborted message if
+ * "isolation.level" is "read_committed". The offsets in the range 
[offset, nextOffsetToFetch) are
+ * missing. In order words, `nextOffsetToFetch` indicates the next offset 
to fetch.
+ */
+private[kafka010] class MissingOffsetException(
+val offset: Long,
--- End diff --

maybe rename offset to something like missingOffset. Its weird to have a 
generic named field "offset" next to a specifically named field 
"nextOffsetToFetch".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209474755
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -251,32 +274,53 @@ private[kafka010] case class InternalKafkaConsumer(
   untilOffset: Long,
--- End diff --

Update docs of this method saying that it can throw MissingOffsetException 
and what it means?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22042: [SPARK-25005][SS]Support non-consecutive offsets ...

2018-08-12 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/22042#discussion_r209473316
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -95,6 +106,10 @@ private[kafka010] case class InternalKafkaConsumer(
 ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
   @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
 
+  @volatile private var offsetBeforePoll: Long = UNKNOWN_OFFSET
--- End diff --

Can you add some docs to explain what these 2 vars siginify and why these 
vars are needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-08-08 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21469
  
I am having a second thoughts about this. Exposing the entire memory usage 
of all the loaded maps as another custom metric  just adds more confusion. 
Rather the point of the the main state metric `memoryUsedBytes` is to capture 
how much memory is occupied because of the one partition of the state, and that 
implicitly should cover all the loaded versions of that state partition. So I 
strongly feel that instead of adding a custom metric, we should change the 
existing `memoryUsedBytes` to capture all the memory. 

I am fine adding the custom metrics hit and miss counts. No questions about 
that. 

What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208485230
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208483760
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
--- End diff --

nit: why is the input typed InternalRow where everything else is UnsafeRow? 
seems inconsistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208482355
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
--- End diff --

super nit: some of these can be compressed to a single line doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208491512
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208492158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -167,6 +165,18 @@ trait WatermarkSupport extends UnaryExecNode {
   }
 }
   }
+
+  protected def removeKeysOlderThanWatermark(storeManager: 
StreamingAggregationStateManager,
--- End diff --

incorrect indent of parameters



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208488566
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
--- End diff --

This is really does not need to be in this interface as this is not 
customized and is unlikely to be ever customized across implementations


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208491168
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StreamingAggregationStateManagerSuite extends StreamTest {
+  //  fields and method for test data 

+
+  val testKeys: Seq[String] = Seq("key1", "key2")
+  val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)")
+
+  val testOutputSchema: StructType = StructType(
+testKeys.map(createIntegerField) ++ testValues.map(createIntegerField))
+
+  val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes
+  val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p 
=>
+testKeys.contains(p.name)
+  }
+  val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { 
p =>
+testValues.contains(p.name)
+  }
+  val expectedTestValuesSchema: StructType = 
testValuesAttributes.toStructType
+
+  val testRow: UnsafeRow = {
+val unsafeRowProjection = UnsafeProjection.create(testOutputSchema)
+val row = unsafeRowProjection(new 
SpecificInternalRow(testOutputSchema))
+(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => 
row.setInt(index, index) }
+row
+  }
+
+  val expectedTestKeyRow: UnsafeRow = {
+val keyProjector = 
GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes)
+keyProjector(testRow)
+  }
+
+  val expectedTestValueRowForV2: UnsafeRow = {
+val valueProjector = 
GenerateUnsafeProjection.generate(testValuesAttributes,
+  testOutputAttributes)
+valueProjector(testRow)
+  }
+
+  private def createIntegerField(name: String): StructField = {
+StructField(name, IntegerType, nullable = false)
+  }
+
+  //  StateManagerImplV1 

+
+  test("StateManager v1 - get, put, iter") {
+val stateManager = newStateManager(testKeyAttributes, 
testOutputAttributes, 1)
+
+// in V1, input row is stored as value
+testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow,
+  expectedTestKeyRow, testRow)
+  }
+
+  //  StateManagerImplV2 

+  test("StateManager v2 - get, put, iter") {
+val stateManager = newStateManager(testKeyAttributes, 
testOutputAttributes, 2)
+
+// in V2, row for values itself (excluding keys from input row) is 
stored as value
+// so that stored value doesn't have key part, but state manager V2 
will provide same output
+// as V1 when getting row for key
+testGetPutIterOnStateManager(stateManager, expectedTestValuesSchema, 
testRow,
+  expectedTestKeyRow, expectedTestValueRowForV2)
+  }
+
+  private def newStateManager(
+  keysAttributes: Seq[Attribute],
+  inputRowAttributes: Seq[Attribute],
+  version: Int): StreamingAggregationStateManager = {
+StreamingAggregationStateManager.createStateManager(keysAttributes, 
inputRowAttributes, version)
+  }
+
+  private def testGetPutIterOnStateManager(
+  stateManager: StreamingAggregationStateManager,
+  expectedValueSchema: StructType,
+  inputRow: UnsafeRow

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208487198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208486837
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208496351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -201,33 +211,37 @@ object WatermarkSupport {
 case class StateStoreRestoreExec(
 keyExpressions: Seq[Attribute],
 stateInfo: Option[StatefulOperatorStateInfo],
+stateFormatVersion: Int,
 child: SparkPlan)
   extends UnaryExecNode with StateStoreReader {
 
+  private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
+keyExpressions, child.output, stateFormatVersion)
+
   override protected def doExecute(): RDD[InternalRow] = {
 val numOutputRows = longMetric("numOutputRows")
 
 child.execute().mapPartitionsWithStateStore(
   getStateInfo,
   keyExpressions.toStructType,
-  child.output.toStructType,
+  stateManager.getStateValueSchema,
   indexOrdinal = None,
   sqlContext.sessionState,
   Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
-val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
 val hasInput = iter.hasNext
 if (!hasInput && keyExpressions.isEmpty) {
   // If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
   // the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
   // restore the value, so that we don't overwrite our state with 
a 0 value, but rather
   // merge the 0 with existing state.
+  // In this case the value should represent origin row, so no 
need to restore.
--- End diff --

what does this mean? I think this is not needed any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208490526
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StreamingAggregationStateManagerSuite extends StreamTest {
+  //  fields and method for test data 

+
+  val testKeys: Seq[String] = Seq("key1", "key2")
+  val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)")
+
+  val testOutputSchema: StructType = StructType(
+testKeys.map(createIntegerField) ++ testValues.map(createIntegerField))
+
+  val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes
+  val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p 
=>
+testKeys.contains(p.name)
+  }
+  val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { 
p =>
+testValues.contains(p.name)
+  }
+  val expectedTestValuesSchema: StructType = 
testValuesAttributes.toStructType
+
+  val testRow: UnsafeRow = {
+val unsafeRowProjection = UnsafeProjection.create(testOutputSchema)
+val row = unsafeRowProjection(new 
SpecificInternalRow(testOutputSchema))
+(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => 
row.setInt(index, index) }
+row
+  }
+
+  val expectedTestKeyRow: UnsafeRow = {
+val keyProjector = 
GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes)
+keyProjector(testRow)
+  }
+
+  val expectedTestValueRowForV2: UnsafeRow = {
+val valueProjector = 
GenerateUnsafeProjection.generate(testValuesAttributes,
+  testOutputAttributes)
+valueProjector(testRow)
+  }
+
+  private def createIntegerField(name: String): StructField = {
+StructField(name, IntegerType, nullable = false)
+  }
+
+  //  StateManagerImplV1 

+
+  test("StateManager v1 - get, put, iter") {
+val stateManager = newStateManager(testKeyAttributes, 
testOutputAttributes, 1)
+
+// in V1, input row is stored as value
+testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow,
+  expectedTestKeyRow, testRow)
+  }
+
+  //  StateManagerImplV2 

+  test("StateManager v2 - get, put, iter") {
+val stateManager = newStateManager(testKeyAttributes, 
testOutputAttributes, 2)
+
+// in V2, row for values itself (excluding keys from input row) is 
stored as value
+// so that stored value doesn't have key part, but state manager V2 
will provide same output
+// as V1 when getting row for key
+testGetPutIterOnStateManager(stateManager, expectedTestValuesSchema, 
testRow,
+  expectedTestKeyRow, expectedTestValueRowForV2)
+  }
+
+  private def newStateManager(
+  keysAttributes: Seq[Attribute],
+  inputRowAttributes: Seq[Attribute],
+  version: Int): StreamingAggregationStateManager = {
+StreamingAggregationStateManager.createStateManager(keysAttributes, 
inputRowAttributes, version)
--- End diff --

This is an absolute shallow method, copying the exact same parameters to 
another method? Whats the point of it?


---

-

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208490973
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManagerSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class StreamingAggregationStateManagerSuite extends StreamTest {
+  //  fields and method for test data 

+
+  val testKeys: Seq[String] = Seq("key1", "key2")
+  val testValues: Seq[String] = Seq("sum(key1)", "sum(key2)")
+
+  val testOutputSchema: StructType = StructType(
+testKeys.map(createIntegerField) ++ testValues.map(createIntegerField))
+
+  val testOutputAttributes: Seq[Attribute] = testOutputSchema.toAttributes
+  val testKeyAttributes: Seq[Attribute] = testOutputAttributes.filter { p 
=>
+testKeys.contains(p.name)
+  }
+  val testValuesAttributes: Seq[Attribute] = testOutputAttributes.filter { 
p =>
+testValues.contains(p.name)
+  }
+  val expectedTestValuesSchema: StructType = 
testValuesAttributes.toStructType
+
+  val testRow: UnsafeRow = {
+val unsafeRowProjection = UnsafeProjection.create(testOutputSchema)
+val row = unsafeRowProjection(new 
SpecificInternalRow(testOutputSchema))
+(testKeys ++ testValues).zipWithIndex.foreach { case (_, index) => 
row.setInt(index, index) }
+row
+  }
+
+  val expectedTestKeyRow: UnsafeRow = {
+val keyProjector = 
GenerateUnsafeProjection.generate(testKeyAttributes, testOutputAttributes)
+keyProjector(testRow)
+  }
+
+  val expectedTestValueRowForV2: UnsafeRow = {
+val valueProjector = 
GenerateUnsafeProjection.generate(testValuesAttributes,
+  testOutputAttributes)
+valueProjector(testRow)
+  }
+
+  private def createIntegerField(name: String): StructField = {
+StructField(name, IntegerType, nullable = false)
+  }
+
+  //  StateManagerImplV1 

+
+  test("StateManager v1 - get, put, iter") {
+val stateManager = newStateManager(testKeyAttributes, 
testOutputAttributes, 1)
+
+// in V1, input row is stored as value
+testGetPutIterOnStateManager(stateManager, testOutputSchema, testRow,
+  expectedTestKeyRow, testRow)
--- End diff --

nit: last param `testRow` to `expectedStateValue = testRow` to make it 
clear what it means, and distinguish it from the previous `testRow` param


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208485192
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208486899
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208491615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+def commit(store: StateStore): Long
+
+/**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+def remove(store: StateStore, key: UnsafeRow): Unit
+
+/**
+ * Return an iterator containing all the key-value pairs in target 
state store.
+ */
+def iterator(store: StateStore): Iterator[UnsafeRowPair]
+
+/**
+ * Return an iterator containing all the keys in target state store.
+ */
+def keys(store: StateStore): Iterator[UnsafeRow]
+
+/**
+ * Return an iterator containing all the values in target state store.
+ */
+def values(store: StateStore): Iterator[UnsafeRow]
+  }
+
+  object StreamingAggregationStateManager extends Logging {
+val supportedVersions = Seq(1, 2)
+val legacyVersion = 1
+
+def createStateManager(
+keyExpressions: Seq[Attribute],
+inputRowAttributes: Seq[Attribute],
+stateFormatVersion: Int): StreamingAggregationStateManager = {
+  stateFormatVersion match {
+case 1 => new 
StreamingAggregationStateManagerImplV1(keyExpressions, inputRowAttributes)
+case 2 => new 
StreamingAggregationStateManagerImplV2(keyExpressions, inputRowAttributes)
+case _ => throw new IllegalArgumentException(s"Version 
$stateFormatVersion is invalid")
+  }
+}
+  }
+
+  abstract class StreamingAggregationStateManagerBaseImpl(
+  protected val keyExpressions: Seq[Attribute],
+  protected val inputRowAttributes: Seq[Attribute]) extends 
StreamingAggregationStateManager {
+
+@transient protected lazy val keyProjector =
+  GenerateUnsafeProjection.generate(keyExpressions, inputRowAttributes)
+
+def getKey(row: InternalRow): UnsafeRow = keyProjector(row)
+
+override def commit(store: StateStore): Long = store.commit()
+
+override def remove(store: StateStore, key: UnsafeRow): Unit = 
store.remove(key)
+
+override def keys(store: StateStore): Iterator[UnsafeRow] = {
+  // discard and don't convert values to avoid computation
+  store.getRange(None, None).map(_.key)
+}
+  }
+
+  /**
+   * The implementation of StreamingAggregationStateManager for state 
version 1.
+   * In state version 1, the schema of key and value in state are follow:
+   *
+   * - key: Same as key expressions.
+   * - value: Same as input row attributes. The schema of value contains 
key 

[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208483242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
--- End diff --

nit: a lot of the `@param` and `@return` in the docs are a bit superfluous 
as it just repeats what the main statement already says. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208481983
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
--- End diff --

Ummm why is it in this package class and not in separate file?? Is there 
any reason it has to be `state` package object when not all of stateful require 
it, only streaming aggregation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208489469
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -871,6 +871,16 @@ object SQLConf {
 .intConf
 .createWithDefault(2)
 
+  val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
+buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
+  .internal()
+  .doc("State format version used by streaming aggregation operations 
in a streaming query. " +
+"State between versions are tend to be incompatible, so state 
format version shouldn't " +
+"be modified after running.")
+  .intConf
+  .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
+  .createWithDefault(2)
--- End diff --

If you intend to change the default to the new version, then you HAVE TO 
add a test that ensures that existing streaming aggregation checkpoints 
(generated in Spark 2.3.1 for example) will not fail to recover.

Similar to this test - 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala#L883


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208482479
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
@@ -81,4 +85,221 @@ package object state {
 storeCoordinator)
 }
   }
+
+  /**
+   * Base trait for state manager purposed to be used from streaming 
aggregations.
+   */
+  sealed trait StreamingAggregationStateManager extends Serializable {
+
+/**
+ * Extract columns consisting key from input row, and return the new 
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+def getKey(row: InternalRow): UnsafeRow
+
+/**
+ * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value 
of state.
+ */
+def getStateValueSchema: StructType
+
+/**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise 
null.
+ */
+def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+/**
+ * Put a new value for a non-null key to the target state store. Note 
that key will be
+ * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+def put(store: StateStore, row: UnsafeRow): Unit
+
+/**
+ * Commit all the updates that have been made to the target state 
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
--- End diff --

superfluous. just the main statement has all the information.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21733: [SPARK-24763][SS] Remove redundant key data from ...

2018-08-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21733#discussion_r208492352
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -167,6 +165,18 @@ trait WatermarkSupport extends UnaryExecNode {
   }
 }
   }
+
+  protected def removeKeysOlderThanWatermark(storeManager: 
StreamingAggregationStateManager,
--- End diff --

Actually... where is this used? This does not seem to be used anywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21622#discussion_r206761192
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", 
_.durationMs.get("triggerExecution").longValue(), 0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+progress => 
convertStringDateToMillis(progress.eventTime.get("watermark")), 0L)
+
+  registerGauge("states-rowsTotal", 
_.stateOperators.map(_.numRowsTotal).sum, 0L)
+  registerGauge("states-usedBytes", 
_.stateOperators.map(_.memoryUsedBytes).sum, 0L)
+
--- End diff --

Those are custom metrics, which may or may not be present depending on the 
implementation of state store. I dont recommend adding them here directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206739252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala 
---
@@ -81,10 +81,10 @@ class SQLMetric(val metricType: String, initValue: Long 
= 0L) extends Accumulato
 }
 
 object SQLMetrics {
-  private val SUM_METRIC = "sum"
-  private val SIZE_METRIC = "size"
-  private val TIMING_METRIC = "timing"
-  private val AVERAGE_METRIC = "average"
+  val SUM_METRIC = "sum"
+  val SIZE_METRIC = "size"
+  val TIMING_METRIC = "timing"
+  val AVERAGE_METRIC = "average"
--- End diff --

why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206738287
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,24 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, customMetrics)
 
   private[sql] def jsonValue: JValue = {
-("numRowsTotal" -> JInt(numRowsTotal)) ~
-("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => 
JValue): JValue = {
+  if (map.isEmpty) return JNothing
+  val keys = map.keySet.asScala.toSeq.sorted
+  keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ 
~ _)
+}
+
+val jsonVal = ("numRowsTotal" -> JInt(numRowsTotal)) ~
+  ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+  ("memoryUsedBytes" -> JInt(memoryUsedBytes))
+
+if (!customMetrics.isEmpty) {
--- End diff --

You are already handling the case of map being empty in `safeMapToJValue` 
by adding JNothing. Doesnt JNothing values just get dropped from the json text 
any way?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-07-31 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r206738919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,24 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, customMetrics)
 
   private[sql] def jsonValue: JValue = {
-("numRowsTotal" -> JInt(numRowsTotal)) ~
-("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => 
JValue): JValue = {
--- End diff --

T is always Long. Why make a generic function for that? This does not even 
need a separate function. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-07-31 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21357
  
While the new code looks a bit cleaner, I am bit hesistant to this complete 
rewriting of the entire critical code. We generally do refactor of the code 
only if there is some ultimate benefit, e.g. performance improvement, adding a 
new feature becomes easy. etc. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >