spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream
Repository: spark Updated Branches: refs/heads/branch-2.0 b113b5d9f -> 3e9840f1d [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream ## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeninger Closes #15504 from koeninger/SPARK-17812. (cherry picked from commit 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e9840f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e9840f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e9840f1 Branch: refs/heads/branch-2.0 Commit: 3e9840f1d923a521d64bfc55fcbb6babd6045f06 Parents: b113b5d Author: cody koeninger Authored: Fri Oct 21 15:55:04 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 15:55:11 2016 -0700 -- docs/structured-streaming-kafka-integration.md | 38 +-- .../apache/spark/sql/kafka010/JsonUtils.scala | 93 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 64 +-- .../sql/kafka010/KafkaSourceProvider.scala | 52 - .../spark/sql/kafka010/StartingOffsets.scala| 32 ++ .../spark/sql/kafka010/JsonUtilsSuite.scala | 45 .../spark/sql/kafka010/KafkaSourceSuite.scala | 114 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 8 files changed, 395 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e9840f1/docs/structured-streaming-kafka-integration.md -- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 668489a..e851f21 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -151,15 +151,24 @@ The following options must be set for the Kafka source. Optionvaluemeaning + assign + json string {"topicA":[0,1],"topicB":[2,4]} + Specific TopicPartitions to consume. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. + + subscribe A comma-separated list of topics - The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be - specified for Kafka source. + The topic list to subscribe. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. subscribePattern Java regex string - The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern" + The pattern used to subscribe to topic(s). + Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. @@ -174,16 +183,21 @@ The following configurations are optional: Optionvaluedefaultmeaning - startingOffset - ["earliest", "latest"] - "latest" - The start point when a query is started, either "earliest" which is from the earliest offset, - or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q - uery is started, and that resuming will always pick up from where the query left off. + startingOffsets + earliest, latest, or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + + latest + The start point when a query is started, either "earliest" which is from the earliest offsets, + "latest" which is just from the latest offsets, or a json string specifying a starting offset for + each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. + Note: This only applies when a new Streaming query is started, and that resuming will always pick + up from where the query left off. Newly discovered partitions during a query will start at + earliest. failOnDataLoss - [true, false] + true or false true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka. Note that the following Kafka params cannot be set and the Kafka source will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. -- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify +- **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structure
spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream
Repository: spark Updated Branches: refs/heads/master 140570252 -> 268ccb9a4 [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream ## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeninger Closes #15504 from koeninger/SPARK-17812. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/268ccb9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/268ccb9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/268ccb9a Branch: refs/heads/master Commit: 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307 Parents: 1405702 Author: cody koeninger Authored: Fri Oct 21 15:55:04 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 15:55:04 2016 -0700 -- docs/structured-streaming-kafka-integration.md | 38 +-- .../apache/spark/sql/kafka010/JsonUtils.scala | 93 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 64 +-- .../sql/kafka010/KafkaSourceProvider.scala | 52 - .../spark/sql/kafka010/StartingOffsets.scala| 32 ++ .../spark/sql/kafka010/JsonUtilsSuite.scala | 45 .../spark/sql/kafka010/KafkaSourceSuite.scala | 114 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 8 files changed, 395 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/docs/structured-streaming-kafka-integration.md -- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 668489a..e851f21 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -151,15 +151,24 @@ The following options must be set for the Kafka source. Optionvaluemeaning + assign + json string {"topicA":[0,1],"topicB":[2,4]} + Specific TopicPartitions to consume. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. + + subscribe A comma-separated list of topics - The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be - specified for Kafka source. + The topic list to subscribe. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. subscribePattern Java regex string - The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern" + The pattern used to subscribe to topic(s). + Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. @@ -174,16 +183,21 @@ The following configurations are optional: Optionvaluedefaultmeaning - startingOffset - ["earliest", "latest"] - "latest" - The start point when a query is started, either "earliest" which is from the earliest offset, - or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q - uery is started, and that resuming will always pick up from where the query left off. + startingOffsets + earliest, latest, or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + + latest + The start point when a query is started, either "earliest" which is from the earliest offsets, + "latest" which is just from the latest offsets, or a json string specifying a starting offset for + each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. + Note: This only applies when a new Streaming query is started, and that resuming will always pick + up from where the query left off. Newly discovered partitions during a query will start at + earliest. failOnDataLoss - [true, false] + true or false true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka. Note that the following Kafka params cannot be set and the Kafka source will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. -- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify +- **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do