spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream

2016-10-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b113b5d9f -> 3e9840f1d


[SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured 
stream

## What changes were proposed in this pull request?

startingOffsets takes specific per-topicpartition offsets as a json argument, 
usable with any consumer strategy

assign with specific topicpartitions as a consumer strategy

## How was this patch tested?

Unit tests

Author: cody koeninger 

Closes #15504 from koeninger/SPARK-17812.

(cherry picked from commit 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e9840f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e9840f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e9840f1

Branch: refs/heads/branch-2.0
Commit: 3e9840f1d923a521d64bfc55fcbb6babd6045f06
Parents: b113b5d
Author: cody koeninger 
Authored: Fri Oct 21 15:55:04 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 21 15:55:11 2016 -0700

--
 docs/structured-streaming-kafka-integration.md  |  38 +--
 .../apache/spark/sql/kafka010/JsonUtils.scala   |  93 +++
 .../apache/spark/sql/kafka010/KafkaSource.scala |  64 +--
 .../sql/kafka010/KafkaSourceProvider.scala  |  52 -
 .../spark/sql/kafka010/StartingOffsets.scala|  32 ++
 .../spark/sql/kafka010/JsonUtilsSuite.scala |  45 
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 114 +--
 .../spark/sql/kafka010/KafkaTestUtils.scala |  14 ++-
 8 files changed, 395 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3e9840f1/docs/structured-streaming-kafka-integration.md
--
diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 668489a..e851f21 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -151,15 +151,24 @@ The following options must be set for the Kafka source.
 
 Optionvaluemeaning
 
+  assign
+  json string {"topicA":[0,1],"topicB":[2,4]}
+  Specific TopicPartitions to consume.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.
+
+
   subscribe
   A comma-separated list of topics
-  The topic list to subscribe. Only one of "subscribe" and 
"subscribePattern" options can be
-  specified for Kafka source.
+  The topic list to subscribe.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.
 
 
   subscribePattern
   Java regex string
-  The pattern used to subscribe the topic. Only one of "subscribe" and 
"subscribePattern"
+  The pattern used to subscribe to topic(s).
+  Only one of "assign, "subscribe" or "subscribePattern"
   options can be specified for Kafka source.
 
 
@@ -174,16 +183,21 @@ The following configurations are optional:
 
 Optionvaluedefaultmeaning
 
-  startingOffset
-  ["earliest", "latest"]
-  "latest"
-  The start point when a query is started, either "earliest" which is from 
the earliest offset, 
-  or "latest" which is just from the latest offset. Note: This only applies 
when a new Streaming q
-  uery is started, and that resuming will always pick up from where the query 
left off.
+  startingOffsets
+  earliest, latest, or json string
+  {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
+  
+  latest
+  The start point when a query is started, either "earliest" which is from 
the earliest offsets,
+  "latest" which is just from the latest offsets, or a json string specifying 
a starting offset for
+  each TopicPartition.  In the json, -2 as an offset can be used to refer to 
earliest, -1 to latest.
+  Note: This only applies when a new Streaming query is started, and that 
resuming will always pick
+  up from where the query left off. Newly discovered partitions during a query 
will start at
+  earliest.
 
 
   failOnDataLoss
-  [true, false]
+  true or false
   true
   Whether to fail the query when it's possible that data is lost (e.g., 
topics are deleted, or 
   offsets are out of range). This may be a false alarm. You can disable it 
when it doesn't work
@@ -215,10 +229,10 @@ Kafka's own configurations can be set via 
`DataStreamReader.option` with `kafka.
 
 Note that the following Kafka params cannot be set and the Kafka source will 
throw an exception:
 - **group.id**: Kafka source will create a unique group id for each query 
automatically.
-- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` 
or `latest` to specify
+- **auto.offset.reset**: Set the source option `startingOffsets` to specify
  where to start instead. Structure

spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream

2016-10-21 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 140570252 -> 268ccb9a4


[SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured 
stream

## What changes were proposed in this pull request?

startingOffsets takes specific per-topicpartition offsets as a json argument, 
usable with any consumer strategy

assign with specific topicpartitions as a consumer strategy

## How was this patch tested?

Unit tests

Author: cody koeninger 

Closes #15504 from koeninger/SPARK-17812.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/268ccb9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/268ccb9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/268ccb9a

Branch: refs/heads/master
Commit: 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307
Parents: 1405702
Author: cody koeninger 
Authored: Fri Oct 21 15:55:04 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 21 15:55:04 2016 -0700

--
 docs/structured-streaming-kafka-integration.md  |  38 +--
 .../apache/spark/sql/kafka010/JsonUtils.scala   |  93 +++
 .../apache/spark/sql/kafka010/KafkaSource.scala |  64 +--
 .../sql/kafka010/KafkaSourceProvider.scala  |  52 -
 .../spark/sql/kafka010/StartingOffsets.scala|  32 ++
 .../spark/sql/kafka010/JsonUtilsSuite.scala |  45 
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 114 +--
 .../spark/sql/kafka010/KafkaTestUtils.scala |  14 ++-
 8 files changed, 395 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/docs/structured-streaming-kafka-integration.md
--
diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 668489a..e851f21 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -151,15 +151,24 @@ The following options must be set for the Kafka source.
 
 Optionvaluemeaning
 
+  assign
+  json string {"topicA":[0,1],"topicB":[2,4]}
+  Specific TopicPartitions to consume.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.
+
+
   subscribe
   A comma-separated list of topics
-  The topic list to subscribe. Only one of "subscribe" and 
"subscribePattern" options can be
-  specified for Kafka source.
+  The topic list to subscribe.
+  Only one of "assign", "subscribe" or "subscribePattern"
+  options can be specified for Kafka source.
 
 
   subscribePattern
   Java regex string
-  The pattern used to subscribe the topic. Only one of "subscribe" and 
"subscribePattern"
+  The pattern used to subscribe to topic(s).
+  Only one of "assign, "subscribe" or "subscribePattern"
   options can be specified for Kafka source.
 
 
@@ -174,16 +183,21 @@ The following configurations are optional:
 
 Optionvaluedefaultmeaning
 
-  startingOffset
-  ["earliest", "latest"]
-  "latest"
-  The start point when a query is started, either "earliest" which is from 
the earliest offset, 
-  or "latest" which is just from the latest offset. Note: This only applies 
when a new Streaming q
-  uery is started, and that resuming will always pick up from where the query 
left off.
+  startingOffsets
+  earliest, latest, or json string
+  {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
+  
+  latest
+  The start point when a query is started, either "earliest" which is from 
the earliest offsets,
+  "latest" which is just from the latest offsets, or a json string specifying 
a starting offset for
+  each TopicPartition.  In the json, -2 as an offset can be used to refer to 
earliest, -1 to latest.
+  Note: This only applies when a new Streaming query is started, and that 
resuming will always pick
+  up from where the query left off. Newly discovered partitions during a query 
will start at
+  earliest.
 
 
   failOnDataLoss
-  [true, false]
+  true or false
   true
   Whether to fail the query when it's possible that data is lost (e.g., 
topics are deleted, or 
   offsets are out of range). This may be a false alarm. You can disable it 
when it doesn't work
@@ -215,10 +229,10 @@ Kafka's own configurations can be set via 
`DataStreamReader.option` with `kafka.
 
 Note that the following Kafka params cannot be set and the Kafka source will 
throw an exception:
 - **group.id**: Kafka source will create a unique group id for each query 
automatically.
-- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` 
or `latest` to specify
+- **auto.offset.reset**: Set the source option `startingOffsets` to specify
  where to start instead. Structured Streaming manages which offsets are 
consumed internally, rather 
  than rely on the kafka Consumer to do