[FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

This closes #5056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4083c70d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4083c70d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4083c70d

Branch: refs/heads/master
Commit: 4083c70dc88e0022daaab807d67b922d426fb533
Parents: 458c909
Author: Xingcan Cui <xingc...@gmail.com>
Authored: Thu Nov 23 00:00:39 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Thu Nov 23 14:49:14 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  50 +++++++-
 .../kafka/Kafka010AvroTableSource.java          |   2 +-
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka011AvroTableSource.java          |   2 +-
 .../kafka/Kafka011JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/Kafka08AvroTableSource.java           |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka09AvroTableSource.java           |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSource.java      | 126 ++++++++++++++++++-
 .../kafka/KafkaTableSourceTestBase.java         |  44 +++++++
 17 files changed, 230 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7387358..aaf23bc 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -145,7 +145,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 </div>
 </div>
 
-* **Missing Field Handling** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
+* **Missing Field Handling:** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -169,6 +169,30 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### KafkaAvroTableSource
@@ -265,6 +289,30 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### Configuring a Processing Time Attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index fbc58ea..660162a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka010AvroTableSource extends 
KafkaAvroTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bbdb32f..5f9984e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka010JsonTableSource extends 
KafkaJsonTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index bc675eb..379c562 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka010TableSource extends 
KafkaTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer010<>(topic, 
deserializationSchema, properties);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index af3b5af..a9a109c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka011AvroTableSource extends 
KafkaAvroTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 71158f6..cee7c61 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka011JsonTableSource extends 
KafkaJsonTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index dbf980b..8c40318 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka011TableSource extends 
KafkaTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer011<>(topic, 
deserializationSchema, properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 8f45881..9105c73 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka08AvroTableSource extends 
KafkaAvroTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index b3b37c6..639093d 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka08JsonTableSource extends 
KafkaJsonTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8270b78..3bb6a94 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka08TableSource extends 
KafkaTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer08<>(topic, deserializationSchema, 
properties);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index 808be01..fb8496a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka09AvroTableSource extends 
KafkaAvroTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index a699d65..ded23b0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka09JsonTableSource extends 
KafkaJsonTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 1d2c028..df15452 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka09TableSource extends 
KafkaTableSource {
        }
 
        @Override
-       FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties 
properties, DeserializationSchema<Row> deserializationSchema) {
+       protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, 
Properties properties, DeserializationSchema<Row> deserializationSchema) {
                return new FlinkKafkaConsumer09<>(topic, deserializationSchema, 
properties);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 8cea36c..055b679 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -44,7 +44,7 @@ import java.util.Properties;
  * A version-agnostic Kafka Avro {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 public abstract class KafkaAvroTableSource extends KafkaTableSource implements 
DefinedFieldMapping {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 9a6525c..6806673 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -32,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka JSON {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  *
  * <p>The field names are used to parse the JSON file and so are the types.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 3291f7d..d0ee7de 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
@@ -37,6 +39,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import scala.Option;
@@ -45,7 +48,7 @@ import scala.Option;
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, 
DeserializationSchema)}}.
  */
 public abstract class KafkaTableSource
        implements StreamTableSource<Row>, DefinedProctimeAttribute, 
DefinedRowtimeAttributes {
@@ -68,6 +71,12 @@ public abstract class KafkaTableSource
        /** Descriptor for a rowtime attribute. */
        private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
 
+       /** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
+       private StartupMode startupMode;
+
+       /** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
+       private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
        /**
         * Creates a generic Kafka {@link StreamTableSource}.
         *
@@ -121,6 +130,37 @@ public abstract class KafkaTableSource
                return rowtimeAttributeDescriptors;
        }
 
+       /**
+        * Returns a version-specific Kafka consumer with the start position 
configured.
+        *
+        * @param topic                 Kafka topic to consume.
+        * @param properties            Properties for the Kafka consumer.
+        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
+        * @return The version-specific Kafka consumer
+        */
+       protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+                       String topic,
+                       Properties properties,
+                       DeserializationSchema<Row> deserializationSchema) {
+               FlinkKafkaConsumerBase<Row> kafkaConsumer =
+                               createKafkaConsumer(topic, properties, 
deserializationSchema);
+               switch (startupMode) {
+                       case EARLIEST:
+                               kafkaConsumer.setStartFromEarliest();
+                               break;
+                       case LATEST:
+                               kafkaConsumer.setStartFromLatest();
+                               break;
+                       case GROUP_OFFSETS:
+                               kafkaConsumer.setStartFromGroupOffsets();
+                               break;
+                       case SPECIFIC_OFFSETS:
+                               
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+                               break;
+               }
+               return kafkaConsumer;
+       }
+
        //////// SETTERS FOR OPTIONAL PARAMETERS
 
        /**
@@ -160,17 +200,35 @@ public abstract class KafkaTableSource
                this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
        }
 
+       /**
+        * Sets the startup mode of the TableSource.
+        *
+        * @param startupMode The startup mode.
+        */
+       protected void setStartupMode(StartupMode startupMode) {
+               this.startupMode = startupMode;
+       }
+
+       /**
+        * Sets the startup offsets of the TableSource; only relevant when the 
startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
+        *
+        * @param specificStartupOffsets The startup offsets for different 
partitions.
+        */
+       protected void setSpecificStartupOffsets(Map<KafkaTopicPartition, Long> 
specificStartupOffsets) {
+               this.specificStartupOffsets = specificStartupOffsets;
+       }
+
        //////// ABSTRACT METHODS FOR SUBCLASSES
 
        /**
-        * Returns the version-specific Kafka consumer.
+        * Creates a version-specific Kafka consumer.
         *
         * @param topic                 Kafka topic to consume.
         * @param properties            Properties for the Kafka consumer.
         * @param deserializationSchema Deserialization schema to use for Kafka 
records.
         * @return The version-specific Kafka consumer
         */
-       abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+       protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
                        String topic,
                        Properties properties,
                        DeserializationSchema<Row> deserializationSchema);
@@ -201,6 +259,13 @@ public abstract class KafkaTableSource
 
                private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
 
+               /** The startup mode for the contained consumer (default is 
{@link StartupMode#GROUP_OFFSETS}). */
+               private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+               /** Specific startup offsets; only relevant when startup mode 
is {@link StartupMode#SPECIFIC_OFFSETS}. */
+               private Map<KafkaTopicPartition, Long> specificStartupOffsets = 
null;
+
+
                /**
                 * Sets the topic from which the table is read.
                 *
@@ -310,6 +375,51 @@ public abstract class KafkaTableSource
                }
 
                /**
+                * Configures the TableSource to start reading from the 
earliest offset for all partitions.
+                *
+                * @see FlinkKafkaConsumerBase#setStartFromEarliest()
+                */
+               public B fromEarliest() {
+                       this.startupMode = StartupMode.EARLIEST;
+                       this.specificStartupOffsets = null;
+                       return builder();
+               }
+
+               /**
+                * Configures the TableSource to start reading from the latest 
offset for all partitions.
+                *
+                * @see FlinkKafkaConsumerBase#setStartFromLatest()
+                */
+               public B fromLatest() {
+                       this.startupMode = StartupMode.LATEST;
+                       this.specificStartupOffsets = null;
+                       return builder();
+               }
+
+               /**
+                * Configures the TableSource to start reading from any 
committed group offsets found in Zookeeper / Kafka brokers.
+                *
+                * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+                */
+               public B fromGroupOffsets() {
+                       this.startupMode = StartupMode.GROUP_OFFSETS;
+                       this.specificStartupOffsets = null;
+                       return builder();
+               }
+
+               /**
+                * Configures the TableSource to start reading partitions from 
specific offsets, set independently for each partition.
+                *
+                * @param specificStartupOffsets the specified offsets for 
partitions
+                * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+                */
+               public B fromSpecificOffsets(Map<KafkaTopicPartition, Long> 
specificStartupOffsets) {
+                       this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+                       this.specificStartupOffsets = 
Preconditions.checkNotNull(specificStartupOffsets);
+                       return builder();
+               }
+
+               /**
                 * Returns the configured topic.
                 *
                 * @return the configured topic.
@@ -357,6 +467,16 @@ public abstract class KafkaTableSource
                        } else {
                                
tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
                        }
+                       tableSource.setStartupMode(startupMode);
+                       switch (startupMode) {
+                               case EARLIEST:
+                               case LATEST:
+                               case GROUP_OFFSETS:
+                                       break;
+                               case SPECIFIC_OFFSETS:
+                                       
tableSource.setSpecificStartupOffsets(specificStartupOffsets);
+                                       break;
+                       }
                }
 
                /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 7a882f4..64dac06 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.Row;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -44,6 +45,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Abstract test base for all Kafka table sources.
@@ -188,6 +190,48 @@ public abstract class KafkaTableSourceTestBase {
                }
        }
 
+       @Test
+       public void testKafkaTSSetConsumeOffsets() {
+               KafkaTableSource.Builder b = getBuilder();
+               configureBuilder(b);
+
+               // test the default behavior
+               KafkaTableSource source = spy(b.build());
+               when(source.createKafkaConsumer(TOPIC, PROPS, null))
+                               .thenReturn(mock(getFlinkKafkaConsumer()));
+
+               verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromGroupOffsets();
+
+               // test reading from earliest
+               b.fromEarliest();
+               source = spy(b.build());
+               when(source.createKafkaConsumer(TOPIC, PROPS, null))
+                               .thenReturn(mock(getFlinkKafkaConsumer()));
+
+               verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromEarliest();
+
+               // test reading from latest
+               b.fromLatest();
+               source = spy(b.build());
+               when(source.createKafkaConsumer(TOPIC, PROPS, null))
+                               .thenReturn(mock(getFlinkKafkaConsumer()));
+               verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromLatest();
+
+               // test reading from group offsets
+               b.fromGroupOffsets();
+               source = spy(b.build());
+               when(source.createKafkaConsumer(TOPIC, PROPS, null))
+                               .thenReturn(mock(getFlinkKafkaConsumer()));
+               verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromGroupOffsets();
+
+               // test reading from given offsets
+               b.fromSpecificOffsets(mock(Map.class));
+               source = spy(b.build());
+               when(source.createKafkaConsumer(TOPIC, PROPS, null))
+                               .thenReturn(mock(getFlinkKafkaConsumer()));
+               verify(source.getKafkaConsumer(TOPIC, PROPS, 
null)).setStartFromSpecificOffsets(any(Map.class));
+       }
+
        protected abstract KafkaTableSource.Builder getBuilder();
 
        protected abstract Class<DeserializationSchema<Row>> 
getDeserializationSchema();

Reply via email to