[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2018-05-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3915


---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2018-01-10 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r160676392
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -34,6 +38,13 @@
  */
 public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
 
+   private Date startupDate;
--- End diff --

Passing in the startup date to the API call bridge constructor seems to be 
very confusing ...


---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2018-01-10 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r160677582
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -48,4 +59,18 @@ public void seekPartitionToBeginning(KafkaConsumer 
consumer, TopicPartitio
public void seekPartitionToEnd(KafkaConsumer consumer, 
TopicPartition partition) {
consumer.seekToEnd(Collections.singletonList(partition));
}
+
+   @Override
+   public void seekPartitionToDate(KafkaConsumer consumer, 
TopicPartition partition) {
--- End diff --

But from here I can understand why.

Ideally, this method signature should really be 
`seekPartitionToDate(KafkaConsumer, TopicParitition, Date)`, but that would 
require the startup date to be passed all the way to the `KafkaConsumerThread`.
This also leads to the fact, which isn't nice, that the 
`KafkaConsumerThread` lives within the Kafka 0.9 module, while 0.9 doesn't 
support timestamp-based offsets ...


---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-30 Thread zjureel
Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124977924
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
--- End diff --

In fact we need to override this in 0.10 here. `FlinkKafkaConsumer010` 
extends from `FlinkKafkaConsumer09`, and `Exception` will be thrown in 
`setStartFromSpecificDate`  of  `FlinkKafkaConsumer09`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964821
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Could you move these private aux methods to the end of the class? That 
would benefit the readability / flow of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964762
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
--- End diff --

unnecessary empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124966020
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Of course, this would entail that we need to encode the timestamp into a 
`KafkaTopicPartitionStateSentinel`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124965318
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
--- End diff --

I don't think you need to override this in 0.10, right?
The implementation is basically identical to the base implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964448
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

cool, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964813
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
--- End diff --

Could you move these private aux methods to the end of the class? That 
would benefit the readability / flow of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124966340
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -697,13 +738,19 @@ protected static void 
initializeSubscribedPartitionsToStartOffsets(
int indexOfThisSubtask,
int numParallelSubtasks,
StartupMode startupMode,
+   Date specificStartupDate,
Map specificStartupOffsets) {
 
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) {
-   if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
-   
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
startupMode.getStateSentinel());
-   } else {
+   if (startupMode == 
StartupMode.SPECIFIC_TIMESTAMP) {
+   if (specificStartupDate == null) {
+   throw new 
IllegalArgumentException(
+   "Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_TIMESTAMP +
+   ", but no 
specific timestamp were specified");
+   }
+   
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
specificStartupDate.getTime());
--- End diff --

This is the main problem:
following the original design pattern, it would be better to place a 
`KafkaTopicPartitionStateSentinel` here instead of eagerly converting the 
`Date` to a specific offset. We only convert the date to specific offsets when 
we're about to start consuming the partition (i.e. in `KafkaConsumer` thread).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124965642
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

I think we need to move this conversion logic to `KafkaConsumerThread`, 
otherwise we would be instantiating a KafkaConsumer just for the sake of 
fetching timestamp-based offsets.
That's where the actual "`KafkaTopicPartitionStateSentinel` to actual 
offset" conversions take place.
See `KafkaConsumerThread` lines 369 - 390
```
// offsets in the state of new partitions may still be placeholder sentinel 
values if we are:
//   (1) starting fresh,
//   (2) checkpoint / savepoint state we were restored with had not 
completely
//   been replaced with actual offset values yet, or
//   (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value 
represent.
for (KafkaTopicPartitionState newPartitionState : 
newPartitions) {
if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerCallBridge.seekPartitionToBeginning(consumerTmp, 
newPartitionState.getKafkaPartitionHandle());

newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerCallBridge.seekPartitionToEnd(consumerTmp, 
newPartitionState.getKafkaPartitionHandle());

newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the 
consumer position
// to the committed group offset, so we do not need to do it.


newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), 
newPartitionState.getOffset() + 1);
}
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-23 Thread zjureel
GitHub user zjureel reopened a pull request:

https://github.com/apache/flink/pull/3915

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3915


commit 53eaea8e73ee704e0d344fee85a67286191c6bde
Author: zjureel 
Date:   2017-06-23T08:16:49Z

[FLINK-6499] FlinkKafkaConsumer should support to use timestamp to set up 
start offset




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-06-23 Thread zjureel
Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/3915


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-18 Thread zjureel
Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117206216
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema d
validateAutoOffsetResetValue(props);
}
 
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+*
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Indeed, user may be doubt about the new method when he used Kafka version 
0.8 and 0.9 both. New functionality backwards compatibility is a better 
experience, I think this method could be added when timestamp is supported both 
by version 0.8 and 0.9


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175607
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -171,6 +170,11 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   throw new RuntimeException("This method dose not support for 
version 0.8 of Kafka");
--- End diff --

Do you mean 0.9? Also, typo on "dose".
I would also suggest to be more specific: "Starting from a specific date is 
not supported for Kafka version xx".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175760
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -311,12 +311,14 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
 * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
 * savepoint, only the offsets in the restored state will be used.
 *
-* Note: The api is supported by kafka version >= 0.10 only.
-*
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
-   throw new RuntimeException("This method supports kafka version 
>= 0.10 only.");
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
--- End diff --

must "be" before.
Could you also add the errorneous time to the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175199
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema d
validateAutoOffsetResetValue(props);
}
 
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+*
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Actually I think lets just disable the timestamp option for 0.8.

I just think its a bit strange that the functionality is there for 0.8 and 
0.10, but skipped for 0.10.

Sorry for jumping back and forth here, trying to figure out what would be 
most natural.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-17 Thread zjureel
Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117162289
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn 
dependency:tree to print the dependency information:
+- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile

+- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675159
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
--- End diff --

the kafka --> just "Kafka", with K capitalized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674925
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -181,12 +181,6 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
 
boolean useMetrics = !PropertiesUtil.getBoolean(properties, 
KEY_DISABLE_METRICS, false);
 
-   // make sure that auto commit is disabled when our offset 
commit mode is ON_CHECKPOINTS;
-   // this overwrites whatever setting the user configured in the 
properties
-   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || 
offsetCommitMode == OffsetCommitMode.DISABLED) {
-   
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-   }
--- End diff --

This shouldn't be removed (I assume you accidentally removed it when 
rebasing?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674594
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -53,6 +53,10 @@ under the License.
org.apache.kafka

kafka_${scala.binary.version}

+   
+   org.apache.kafka
+   kafka-clients
+   
--- End diff --

Could you explain a bit why this is needed now? Thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116676798
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
+* If there's no such message, the consumer will use the latest offset 
to read data from kafka.
+*
+* This method does not effect where partitions are read from when the 
consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* Note: The api is supported by kafka version >= 0.10 only.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   throw new RuntimeException("This method supports kafka version 
>= 0.10 only.");
--- End diff --

If only 0.10 supports this, shouldn't we add it to the 
`FlinkKafkaConsumer010` class only?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675083
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
--- End diff --

"curr" --> "current"
We usually avoid abbreviations like this in Javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675211
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
+* If there's no such message, the consumer will use the latest offset 
to read data from kafka.
--- End diff --

"message" --> "offset" is the term used in Kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674541
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

Just to be sure: were there any additional dependencies as a result to this 
bump?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...

2017-05-16 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3915

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3915


commit e1f5aee8a471ef1f1e8cec3104807b22954b6a42
Author: zjureel 
Date:   2017-05-15T10:27:24Z

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

commit 5d482c57ad19f0f9739fe5b40fe6e8713900e8a4
Author: zjureel 
Date:   2017-05-16T07:37:09Z

fix StreamExecutionEnvironment test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---