[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101233499
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws 
Exception {
readProps.setProperty("auto.offset.reset", "earliest");
 
// the committed group offsets should be used as starting points
-   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   Properties offsetHandlerProps = new Properties();
+   offsetHandlerProps.putAll(standardProps);
+   offsetHandlerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   offsetHandlerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

I didn't do that because the constructor of `KafkaOffsetHandlerImpl`  has 
no knowledge of whether the provided `Properties` should be manipulated or not 
(users of `KafkaOffsetHandlerImpl` provide the properties).

However, I think it would make sense to do what you suggested in the 
`KafkaOffsetHandlerImpl` if it always just uses `standardProps` instead of a 
provided properties. In our case that would be completely fine. I'll change it 
as proposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-15 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101231413
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws 
Exception {
readProps.setProperty("auto.offset.reset", "earliest");
 
// the committed group offsets should be used as starting points
-   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   Properties offsetHandlerProps = new Properties();
+   offsetHandlerProps.putAll(standardProps);
+   offsetHandlerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   offsetHandlerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Doesn't it make sense to set these properties once in the 
`KafkaOffsetHandlerImpl` instead of all locations where the offset handler is 
being created?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101075570
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
 The KeyValue objectNode contains a "key" and "value" field which 
contain all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic 
for this message.
 
+ Kafka Consumers Start Position Configuration
+
+By default, the Flink Kafka Consumer starts reading partitions from the 
consumer group's (`group.id` setting in the
+consumer properties) committed offsets in Kafka brokers (or Zookeeper for 
Kafka 0.8).
+
+This behaviour can be explicitly overriden, as demonstrated below:
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest(); // start from the earliest record 
possible
+myConsumer.setStartFromLatest();   // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
--- End diff --

Okay, cool. Can you add that to the docs as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101075446
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Probably its the `KafkaOffsetHandlerImpl`. In that case, yes, just put the 
additional properties when creating the instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101074257
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Yes, move these settings out of the standard properties


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051598
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 50 records written to each of 3 partitions before launching 
a latest-starting consuming job
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   // each partition will be written an extra 200 records
+   final int extraRecordsInEachPartition = 200;
+
+   // all already existing data in the topic, before the consuming 
topology has started, should be ignored
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   // job names for the topologies for writing and consuming the 
extra records
+   final String consumeExtraRecordsJobName = "Consume Extra 
Records Job";
+   final String writeExtraRecordsJobName = "Write Extra Records 
Job";
+
+   // seriliazation / deserialization schemas for writing and 
consuming the extra records
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
+
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   // setup and run the latest-consuming job
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051496
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Perhaps I should move this out of the `standardProps` and set them in those 
tests only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051346
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Somehow, without these settings, the new tests that test `setStartFrom` 
methods will fail by complaining the property config does not specify settings 
for the deserializers.

I guess it is because in those tests, we have Kafka clients that are used 
*only* for offset committing and fetching, in which case the client cannot 
infer the types to use for the serializers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101050769
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
 The KeyValue objectNode contains a "key" and "value" field which 
contain all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic 
for this message.
 
+ Kafka Consumers Start Position Configuration
+
+By default, the Flink Kafka Consumer starts reading partitions from the 
consumer group's (`group.id` setting in the
+consumer properties) committed offsets in Kafka brokers (or Zookeeper for 
Kafka 0.8).
+
+This behaviour can be explicitly overriden, as demonstrated below:
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest(); // start from the earliest record 
possible
+myConsumer.setStartFromLatest();   // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
+
+DataStream stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val myConsumer = new FlinkKafkaConsumer08[String](...)
+myConsumer.setStartFromEarliest()  // start from the earliest record 
possible
+myConsumer.setStartFromLatest()// start from the latest record
+myConsumer.setStartFromGroupOffsets()  // the default behaviour
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+
+
+
+All versions of the Flink Kafka Consumer have the above explicit 
configuration methods for start position. When
+configured to start from the earliest or latest record by calling either 
`setStartFromEarliest()` or `setStartFromLatest()`,
+the consumer will ignore any committed group offsets in Kafka when 
determining the start position for partitions.
--- End diff --

Good point, will add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101050554
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
 The KeyValue objectNode contains a "key" and "value" field which 
contain all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic 
for this message.
 
+ Kafka Consumers Start Position Configuration
+
+By default, the Flink Kafka Consumer starts reading partitions from the 
consumer group's (`group.id` setting in the
+consumer properties) committed offsets in Kafka brokers (or Zookeeper for 
Kafka 0.8).
+
+This behaviour can be explicitly overriden, as demonstrated below:
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest(); // start from the earliest record 
possible
+myConsumer.setStartFromLatest();   // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
--- End diff --

Yes. If the consumer group does not contain offsets for a partition, the 
"auto.offset.reset" property is used for that partition. I think this is the 
behaviour of Kafka's high level consumer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101048002
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 ---
@@ -299,91 +131,60 @@ public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {

testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
testHarness.setup();
+   // restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(

getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
testHarness.open();
 
-   final Throwable[] error = new Throwable[1];
+   // assert that there are partitions and is identical to 
expected list
+   Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
+   
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
+   Assert.assertEquals(partitions, 
consumerFunction.getSubscribedPartitions());
 
-   // run the source asynchronously
-   Thread runner = new Thread() {
-   @Override
-   public void run() {
-   try {
-   consumerFunction.run(new 
DummySourceContext() {
-   @Override
-   public void collect(String 
element) {
-   //latch.trigger();
-   }
-   });
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   error[0] = t;
-   }
-   }
-   };
-   runner.start();
+   // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot"
+   final HashMap expectedState = new 
HashMap<>();
+   expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
+   expectedState.put(new KafkaTopicPartition("def", 7), 
987654321L);
 
-   if (!latch.isTriggered()) {
-   latch.await();
-   }
+   // assert that state is correctly restored from legacy 
checkpoint
+   Assert.assertTrue(consumerFunction.getRestoredState() != null);
+   Assert.assertEquals(expectedState, 
consumerFunction.getRestoredState());
 
consumerOperator.close();
-
-   runner.join();
-
-   Assert.assertNull(error[0]);
-   }
-
-   private abstract static class DummySourceContext
-   implements SourceFunction.SourceContext {
-
-   private final Object lock = new Object();
-
-   @Override
-   public void collectWithTimestamp(String element, long 
timestamp) {
-   }
-
-   @Override
-   public void emitWatermark(Watermark mark) {
-   }
-
-   @Override
-   public Object getCheckpointLock() {
-   return lock;
-   }
-
-   @Override
-   public void close() {
-   }
--- End diff --

I see. That's what this comment is for: 
https://github.com/apache/flink/pull/2509#issuecomment-277438812

@kl0u I think you've implemented most of the migration tests. Can you take 
a look at the changes @tzulitai is proposing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101045081
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 50 records written to each of 3 partitions before launching 
a latest-starting consuming job
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   // each partition will be written an extra 200 records
+   final int extraRecordsInEachPartition = 200;
+
+   // all already existing data in the topic, before the consuming 
topology has started, should be ignored
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   // job names for the topologies for writing and consuming the 
extra records
+   final String consumeExtraRecordsJobName = "Consume Extra 
Records Job";
+   final String writeExtraRecordsJobName = "Write Extra Records 
Job";
+
+   // seriliazation / deserialization schemas for writing and 
consuming the extra records
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
+
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   // setup and run the latest-consuming job
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101047374
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 ---
@@ -299,91 +131,60 @@ public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {

testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
testHarness.setup();
+   // restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(

getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
testHarness.open();
 
-   final Throwable[] error = new Throwable[1];
+   // assert that there are partitions and is identical to 
expected list
+   Assert.assertTrue(consumerFunction.getSubscribedPartitions() != 
null);
+   
Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
+   Assert.assertEquals(partitions, 
consumerFunction.getSubscribedPartitions());
 
-   // run the source asynchronously
-   Thread runner = new Thread() {
-   @Override
-   public void run() {
-   try {
-   consumerFunction.run(new 
DummySourceContext() {
-   @Override
-   public void collect(String 
element) {
-   //latch.trigger();
-   }
-   });
-   }
-   catch (Throwable t) {
-   t.printStackTrace();
-   error[0] = t;
-   }
-   }
-   };
-   runner.start();
+   // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot"
+   final HashMap expectedState = new 
HashMap<>();
+   expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
+   expectedState.put(new KafkaTopicPartition("def", 7), 
987654321L);
 
-   if (!latch.isTriggered()) {
-   latch.await();
-   }
+   // assert that state is correctly restored from legacy 
checkpoint
+   Assert.assertTrue(consumerFunction.getRestoredState() != null);
+   Assert.assertEquals(expectedState, 
consumerFunction.getRestoredState());
 
consumerOperator.close();
-
-   runner.join();
-
-   Assert.assertNull(error[0]);
-   }
-
-   private abstract static class DummySourceContext
-   implements SourceFunction.SourceContext {
-
-   private final Object lock = new Object();
-
-   @Override
-   public void collectWithTimestamp(String element, long 
timestamp) {
-   }
-
-   @Override
-   public void emitWatermark(Watermark mark) {
-   }
-
-   @Override
-   public Object getCheckpointLock() {
-   return lock;
-   }
-
-   @Override
-   public void close() {
-   }
--- End diff --

Looks like you've removed a lot of code from this test here. I guess that 
the `DummyFlinkKafkaConsumer` covers everything the deleted code did?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101000141
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Why is this needed now? If we have this in the default props, we can not 
ensure that users don't need to set it manually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101000341
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
 The KeyValue objectNode contains a "key" and "value" field which 
contain all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic 
for this message.
 
+ Kafka Consumers Start Position Configuration
+
+By default, the Flink Kafka Consumer starts reading partitions from the 
consumer group's (`group.id` setting in the
+consumer properties) committed offsets in Kafka brokers (or Zookeeper for 
Kafka 0.8).
+
+This behaviour can be explicitly overriden, as demonstrated below:
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest(); // start from the earliest record 
possible
+myConsumer.setStartFromLatest();   // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
--- End diff --

Does the "the default behaviour" also mean that we only respect the 
"auto.offset.reset" configs in that case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r100989660
  
--- Diff: docs/dev/connectors/kafka.md ---
@@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
 The KeyValue objectNode contains a "key" and "value" field which 
contain all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic 
for this message.
 
+ Kafka Consumers Start Position Configuration
+
+By default, the Flink Kafka Consumer starts reading partitions from the 
consumer group's (`group.id` setting in the
+consumer properties) committed offsets in Kafka brokers (or Zookeeper for 
Kafka 0.8).
+
+This behaviour can be explicitly overriden, as demonstrated below:
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+FlinkKafkaConsumer08 myConsumer = new FlinkKafkaConsumer08<>(...);
+myConsumer.setStartFromEarliest(); // start from the earliest record 
possible
+myConsumer.setStartFromLatest();   // start from the latest record
+myConsumer.setStartFromGroupOffsets(); // the default behaviour
+
+DataStream stream = env.addSource(myConsumer);
+...
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val myConsumer = new FlinkKafkaConsumer08[String](...)
+myConsumer.setStartFromEarliest()  // start from the earliest record 
possible
+myConsumer.setStartFromLatest()// start from the latest record
+myConsumer.setStartFromGroupOffsets()  // the default behaviour
+
+val stream = env.addSource(myConsumer)
+...
+{% endhighlight %}
+
+
+
+All versions of the Flink Kafka Consumer have the above explicit 
configuration methods for start position. When
+configured to start from the earliest or latest record by calling either 
`setStartFromEarliest()` or `setStartFromLatest()`,
+the consumer will ignore any committed group offsets in Kafka when 
determining the start position for partitions.
--- End diff --

Maybe we should add a note that this setting does NOT affect the start 
position when restoring from a savepoint or checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r100988931
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

No, we are currently not checking the tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89756558
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -482,12 +476,39 @@ public void runStartFromEarliestOffsets() throws 
Exception {
 * ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
 */
public void runStartFromLatestOffsets() throws Exception {
--- End diff --

To make this test easier without having to sleep, the test now does this:

1. First write 50 records to each partition (these shouldn't be read)
2. Set some offsets in Kafka (should be ignored)
3. Start a latest-reading consuming job. This jobs throws exception if it 
reads any of the first 50 records
4. Wait until the consume job has fully started (added an util method to 
`JobManagerCommunicationUtils` for this)
5. Write 200 extra records to each partition.
6. Once the writing finishes, cancel the consume job.
7. Check if the consume job threw any test errors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89755566
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -133,11 +132,10 @@ public void testMetricsAndEndOfStream() throws 
Exception {
 
// --- startup mode ---
 
-   // TODO not passing due to Kafka Consumer config error
-// @Test(timeout = 6)
-// public void testStartFromEarliestOffsets() throws Exception {
-// runStartFromEarliestOffsets();
-// }
+   @Test(timeout = 6)
+   public void testStartFromEarliestOffsets() throws Exception {
--- End diff --

these tests past now with no problem. You're right, setting the key/value 
deserializer keys did the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89755327
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
protected AbstractFetcher createFetcher(
SourceContext sourceContext,
List thisSubtaskPartitions,
+   HashMap 
restoredSnapshotState,
--- End diff --

To make the startup mode logic cleaner, I've changed the `AbstractFetcher` 
life cycle a bit.
Now, restored state is provided when constructing the `AbstractFetcher`, 
instead of explicitly calling `AbstractFetcher#restoreOffsets()` as a separate 
call.

This allows the AbstractFetcher to have a final `isRestored` flag that 
version-specific implementations can use.

The startup offset configuring logic is much simpler now with this flag:
```
if (isRestored) {
  // all subscribed partition states should have defined offset
  // setup the KafkaConsumer client we're using to respect these restored 
offsets
} else {
  // all subscribed partition states have no defined offset
  // (1) set offsets depending on whether startup mode is EARLIEST, LATEST, 
or GROUP_OFFSET
  // (2) use the fetched offsets from Kafka to set the initial partition 
states we use in Flink.
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-26 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89680317
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Actually, the sleep here isn't waiting for the readSequence call to finish. 
I'm waiting a bit to make sure that the consume job has fully started. It won't 
be able to read anything until new latest data is generated afterwards, which 
is done below by `DataGenerators.generateRandomizedIntegerSequence`.

So, what the test is doing is:
1. Write 50 records to each partition.
2. Commit some random offsets.
3. Start a job to read from latest in a separate thread. (should not read 
any of the previous data, offsets also ignored). The `readSequence` is expected 
to read 30 more records from each partition
4. Make sure the 

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89664876
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -218,26 +221,57 @@ public void run() {
}
}
 
-   // seek the consumer to the initial offsets
+   List partitionsWithNoOffset = new 
ArrayList<>();
for (KafkaTopicPartitionState partition 
: subscribedPartitions()) {
if (partition.isOffsetDefined()) {
LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
"to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
 

consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
} else {
-   // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
-   // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
-   // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+   }
+   }
 
-   long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   
seekPartitionsToBeginning(consumer, 
convertKafkaPartitions(subscribedPartitions()));
--- End diff --

The problem with this one is that the `seekToBeginning` method broke 
compatibility from 0.8 to 0.9+.
In 0.8, it's `seekToBeginning(TopicPartition...)` while in 0.9+ it's 
`seekToBeginning(Collection)`.

I'll integrate these seek methods into the `KafkaConsumerCallBridge` 
introduced in a recent PR. I'll be inevitable that we must redundantly do the 
Array -> List conversion because our `subscribedPartitions` is an Array, while 
0.9+ methods take an API. For the 0.8 methods, instead of converting the list 
back to an array, I'll just iterate over the list and call 
`seekPartitionsToBeginning` for each one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89365136
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
@@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
 
PeriodicOffsetCommitter periodicCommitter = null;
try {
-   // read offsets from ZooKeeper for partitions that did 
not restore offsets
-   {
-   List 
partitionsWithNoOffset = new ArrayList<>();
-   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   if (!partition.isOffsetDefined()) {
-   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-   }
+   List partitionsWithNoOffset = new 
ArrayList<>();
+   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
}
+   }
+
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.EarliestTime());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.LatestTime());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
+   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
+
+   Map 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+   if (offset != null) {
+   // the 
committed offset in ZK represents the next record to process,
+   // so we 
subtract it by 1 to correctly represent internal state
+   
partition.setOffset(offset - 1);
+   } else {
+   // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
+   // we default 
to "auto.offset.reset" like the Kafka high-level consumer
+   LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
+   " 
resetting starting offset to 'auto.offset.reset'", partition);
+
+   
partition.setOffset(invalidOffsetBehavior);
+   }
+   }
+   }
+   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361344
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
 schema,
 new Properties(),
 0L,
+   StartupMode.GROUP_OFFSETS,
--- End diff --

Will do!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89364029
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

Will probably need to write a different / custom read method or topology 
for this then.
The problem is that I wanted to reuse `readSequence()` for the test, but it 
expects an exact number of read elements for the test to succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89362689
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -408,23 +412,32 @@ else if (partitionsRemoved) {
}
}
 
-   private void getMissingOffsetsFromKafka(
+   private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
List 
partitions) throws IOException
{
// collect which partitions we should fetch offsets for
-   List 
partitionsToGetOffsetsFor = new ArrayList<>();
+   List 
partitionsWithEarliestOffsetSetting = new ArrayList<>();
--- End diff --

Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if 
getOffset() returns `OffsetRequest.EarliestTime()` or 
`OffsetRequest.LatestTime()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89363213
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

Ah, right! Will fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361839
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

Ah, this was a IDE auto-complete. The style checks don't cover the test 
codes, right? I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89361688
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws 
Exception {
runEndOfStreamTest();
}
 
+   // --- startup mode ---
+
+   // TODO not passing due to Kafka Consumer config error
--- End diff --

Hmm, if I recall correctly, that's what I did in the first place, but that 
caused some other issues. I'll definitely give this another look and make sure 
the test is runnable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89275454
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -41,16 +41,15 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.Seq;
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

Star import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89283859
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -306,13 +340,23 @@ public void run() {
}
}
 
-   // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting 
the timestamp and passing it to the emitRecord() method.
+   // 

+   //  Protected methods that allow pluggable Kafka version-specific 
implementations
+   // 

+
protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is 
extracting the timestamp and passing it to the emitRecord() method.
emitRecord(record, partition, offset, Long.MIN_VALUE);
--- End diff --

This will probably break as well when rebasing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89283319
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -408,23 +412,32 @@ else if (partitionsRemoved) {
}
}
 
-   private void getMissingOffsetsFromKafka(
+   private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
List 
partitions) throws IOException
{
// collect which partitions we should fetch offsets for
-   List 
partitionsToGetOffsetsFor = new ArrayList<>();
+   List 
partitionsWithEarliestOffsetSetting = new ArrayList<>();
--- End diff --

I don't see why you need to copy the partitions into these lists.
I think you can just go over the list and call getLastOffsetFromKafka with 
`part.getOffset()` as the last argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89284541
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -218,26 +221,57 @@ public void run() {
}
}
 
-   // seek the consumer to the initial offsets
+   List partitionsWithNoOffset = new 
ArrayList<>();
for (KafkaTopicPartitionState partition 
: subscribedPartitions()) {
if (partition.isOffsetDefined()) {
LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
"to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
 

consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
} else {
-   // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
-   // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
-   // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+   }
+   }
 
-   long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   
seekPartitionsToBeginning(consumer, 
convertKafkaPartitions(subscribedPartitions()));
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   seekPartitionsToEnd(consumer, 
convertKafkaPartitions(subscribedPartitions()));
--- End diff --

Same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89276062
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
+   }
+   }
+   });
+   consumeThread.start();
+
+   Thread.sleep(5000);
--- End diff --

This is dangerous because there is no guarantee that the `readSequence` 
call finishes in 5 seconds (travis has sometimes pretty slow test execution).

You can probably avoid the sleep by writing a defined number of elements 
into the topic, after the `readSequence()` started. Then, you check if the 
number of read elements is lower of equal to that defined number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89275211
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ---
@@ -501,4 +514,14 @@ private static void getLastOffsetFromKafka(
part.setOffset(offset - 1);
}
}
-}
\ No newline at end of file
+
+   private static void checkAllPartitionsHaveDefinedStartingOffsets(
+   List partitions)
+   {
+   for (KafkaTopicPartitionState part : 
partitions) {
+   if (!part.isOffsetDefined()) {
+   throw new 
RuntimeException("SimpleConsumerThread received a partition with undefined 
starting offset");
--- End diff --

Since we use this in an argument check, we should throw an 
IllegalArgumentException here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89282863
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
@@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
 
PeriodicOffsetCommitter periodicCommitter = null;
try {
-   // read offsets from ZooKeeper for partitions that did 
not restore offsets
-   {
-   List 
partitionsWithNoOffset = new ArrayList<>();
-   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   if (!partition.isOffsetDefined()) {
-   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
-   }
+   List partitionsWithNoOffset = new 
ArrayList<>();
+   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
}
+   }
+
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.EarliestTime());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   
partition.setOffset(OffsetRequest.LatestTime());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
+   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
+
+   Map 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
+   for 
(KafkaTopicPartitionState partition : 
subscribedPartitions()) {
+   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
+   if (offset != null) {
+   // the 
committed offset in ZK represents the next record to process,
+   // so we 
subtract it by 1 to correctly represent internal state
+   
partition.setOffset(offset - 1);
+   } else {
+   // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
+   // we default 
to "auto.offset.reset" like the Kafka high-level consumer
+   LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
+   " 
resetting starting offset to 'auto.offset.reset'", partition);
+
+   
partition.setOffset(invalidOffsetBehavior);
+   }
+   }
+   }
+   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {

[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89150327
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 ---
@@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
 schema,
 new Properties(),
 0L,
+   StartupMode.GROUP_OFFSETS,
--- End diff --

Looks like the indentation of the added lines is correct, but the 
indentation of the file is wrong. Could you fix that with the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89275431
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 ---
@@ -110,6 +110,25 @@ public void testMetrics() throws Throwable {
runMetricsTest();
}
 
+   // --- startup mode ---
+
+   // TODO not passing due to Kafka Consumer config error
+// @Test(timeout = 6)
+// public void testStartFromEarliestOffsets() throws Exception {
+// runStartFromEarliestOffsets();
+// }
--- End diff --

We should fix that as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89275697
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -444,6 +445,134 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "earliest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   Thread consumeThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   readSequence(env, StartupMode.LATEST, 
readProps, parallelism, topicName, 30, 50);
+   } catch (Exception e) {
+   throw new RuntimeException(e);
--- End diff --

This exception will not fail the test.
You need to define a Throwable field, set it in the thread and check it 
once the thread has finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89284524
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -218,26 +221,57 @@ public void run() {
}
}
 
-   // seek the consumer to the initial offsets
+   List partitionsWithNoOffset = new 
ArrayList<>();
for (KafkaTopicPartitionState partition 
: subscribedPartitions()) {
if (partition.isOffsetDefined()) {
LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
"to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
 

consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
} else {
-   // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
-   // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
-   // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+   }
+   }
 
-   long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   
seekPartitionsToBeginning(consumer, 
convertKafkaPartitions(subscribedPartitions()));
--- End diff --

Its a bit inefficient to convert the `subscribedPartitions()` to an 
ArrayList, and then in `seekPartitionsToBeginning` the List is converted back 
into an array. I think we can save the `ArrayList` step and create an array 
immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89274682
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

Star imports are something we try to avoid in Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89276937
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -83,30 +90,17 @@ protected void 
assignPartitionsToConsumer(KafkaConsumer consumer
@Override
protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
// get timestamp from provided ConsumerRecord (only possible 
with kafka 0.10.x)
-   super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+   emitRecord(record, partition, offset, 
consumerRecord.timestamp());
}
 
-   /**
-* Emit record Kafka-timestamp aware.
-*/
@Override
-   protected void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, long 
timestamp) throws Exception {
-   if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
-   // fast path logic, in case there are no watermarks
+   protected void seekPartitionsToBeginning(KafkaConsumer consumer, 
List partitions) {
+   consumer.seekToBeginning(partitions);
+   }
 
-   // emit the record, using the checkpoint lock to 
guarantee
-   // atomicity of record emission and offset state update
-   synchronized (checkpointLock) {
-   sourceContext.collectWithTimestamp(record, 
timestamp);
-   partitionState.setOffset(offset);
-   }
--- End diff --

This has been refactored recently. I think you need to rebase the pull 
request and update the code here.
Sorry for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-11-23 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r89274555
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws 
Exception {
runEndOfStreamTest();
}
 
+   // --- startup mode ---
+
+   // TODO not passing due to Kafka Consumer config error
--- End diff --

This is easy to fix, right? You just have to put serializer classes into 
the `standardProps`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83165620
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -202,13 +204,53 @@ public void run() {
}
}
 
-   // seek the consumer to the initial offsets
+   List partitionsWithNoOffset = new 
ArrayList<>();
for (KafkaTopicPartitionState partition 
: subscribedPartitions()) {
-   if (partition.isOffsetDefined()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+   } else {

consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
}
}
 
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : subscribedPartitions()) {
+   
consumer.seekToBeginning(partition.getKafkaPartitionHandle());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : subscribedPartitions()) {
+   
consumer.seekToEnd(partition.getKafkaPartitionHandle());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Kafka of group.id {} as starting point for partitions {}",
+   
kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
+   // don't need to do anything; 
the KafkaConsumer by default finds group offsets from Kafka brokers
+   }
+   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {
--- End diff --

@rmetzger after looking at 
https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed 
changes here (in the `Kafka09Fetcher`, overall) actually fixes that issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-13 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83165323
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -202,13 +204,53 @@ public void run() {
}
}
 
-   // seek the consumer to the initial offsets
+   List partitionsWithNoOffset = new 
ArrayList<>();
for (KafkaTopicPartitionState partition 
: subscribedPartitions()) {
-   if (partition.isOffsetDefined()) {
+   if (!partition.isOffsetDefined()) {
+   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+   } else {

consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
}
}
 
+   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
+   // if all partitions have no initial offsets, 
that means we're starting fresh
+   switch (startupMode) {
+   case EARLIEST:
+   LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : subscribedPartitions()) {
+   
consumer.seekToBeginning(partition.getKafkaPartitionHandle());
+   }
+   break;
+   case LATEST:
+   LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+   for 
(KafkaTopicPartitionState partition : subscribedPartitions()) {
+   
consumer.seekToEnd(partition.getKafkaPartitionHandle());
+   }
+   break;
+   default:
+   case GROUP_OFFSETS:
+   LOG.info("Using group offsets 
in Kafka of group.id {} as starting point for partitions {}",
+   
kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
+   // don't need to do anything; 
the KafkaConsumer by default finds group offsets from Kafka brokers
+   }
+   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {
+   // we are restoring from a 
checkpoint/savepoint, but there are some new partitions that weren't
+   // subscribed by the consumer on the previous 
execution; in this case, we set the starting offset
+   // of all new partitions to the earliest offset
+   LOG.info("Setting starting point as earliest 
offset for newly created partitions after startup: {}", partitionsWithNoOffset);
+
+   for (KafkaTopicPartitionState 
partition : subscribedPartitions()) {
+   if 
(partitionsWithNoOffset.contains(partition.getKafkaTopicPartition())) {
+   
consumer.seekToBeginning(partition.getKafkaPartitionHandle());
+   }
+   }
+   } else {
+   // restored from a checkpoint/savepoint, and 
all partitions have starting offsets; don't need to do anything
+   }
+
--- End diff --

@rmetzger after looking at 
https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed 
changes here actually fixes that issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83141867
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -345,16 +343,19 @@ protected static void 
validateZooKeeperConfig(Properties props) {
}
}
 
-   private static long getInvalidOffsetBehavior(Properties config) {
+   /**
+* Check for invalid "auto.offset.reset" values. Should be called in 
constructor for eager checking before submitting
+* the job. Note that 'none' is also considered invalid, as we don't 
want to deliberately throw an exception
--- End diff --

Thanks for pointing this out. To keep things simple for now, I propose to 
fix https://issues.apache.org/jira/browse/FLINK-3037 as a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83003348
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -345,16 +343,19 @@ protected static void 
validateZooKeeperConfig(Properties props) {
}
}
 
-   private static long getInvalidOffsetBehavior(Properties config) {
+   /**
+* Check for invalid "auto.offset.reset" values. Should be called in 
constructor for eager checking before submitting
+* the job. Note that 'none' is also considered invalid, as we don't 
want to deliberately throw an exception
--- End diff --

Thank you for the pointer.
We are discussing this issue here 
https://issues.apache.org/jira/browse/FLINK-4280 and here 
https://issues.apache.org/jira/browse/FLINK-3037


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-11 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r82923872
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -345,16 +343,19 @@ protected static void 
validateZooKeeperConfig(Properties props) {
}
}
 
-   private static long getInvalidOffsetBehavior(Properties config) {
+   /**
+* Check for invalid "auto.offset.reset" values. Should be called in 
constructor for eager checking before submitting
+* the job. Note that 'none' is also considered invalid, as we don't 
want to deliberately throw an exception
--- End diff --

Look out for https://issues.apache.org/jira/browse/KAFKA-3370  if you 
aren't already aware.  Right now allowing none and catching the exception only 
on startup is the best workaround I've seen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-09-17 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2509

[FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer

This PR adds the following new explicit setter methods to configure the 
starting position for the Kafka Consumer connector:

```
FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
kafka.setStartFromEarliest(); // start from earliest without respecting any 
committed offsets
kafka.setStartFromLatest(); // start from latest without respecting any 
committed offsets
kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / 
Kafka as starting points
```

The default is to start from group offsets, so we won't be breaking 
existing user code.

One thing to note is that this PR also includes some refactoring to 
consolidate all start offset assigning logic for partitions within the fetcher. 
For example, in 0.8 version, with this change the `SimpleConsumerThread` no 
longer deals with deciding where a partition needs to start from; all 
partitions should already be assigned starting offsets by the fetcher, and it 
simply needs to start consuming the partition.This is a pre-preparation for 
transparent partition discovery for the Kafka consumers in 
[FLINK-4022](https://issues.apache.org/jira/browse/FLINK-4022).

I suggest to review this PR after #2369 to reduce effort in getting the 
0.10 Kafka consumer in first. Tests for the new function will be added in 
follow-up commits.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4280

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2509


commit f1d24806d902a45f66fc9b42a19a303a031b81b1
Author: Tzu-Li (Gordon) Tai 
Date:   2016-09-17T13:41:50Z

[FLINK-4280][kafka-connector] Explicit start position configuration for 
Kafka Consumer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---