[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3915 ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r160676392 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java --- @@ -34,6 +38,13 @@ */ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { + private Date startupDate; --- End diff -- Passing in the startup date to the API call bridge constructor seems to be very confusing ... ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r160677582 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java --- @@ -48,4 +59,18 @@ public void seekPartitionToBeginning(KafkaConsumer consumer, TopicPartitio public void seekPartitionToEnd(KafkaConsumer consumer, TopicPartition partition) { consumer.seekToEnd(Collections.singletonList(partition)); } + + @Override + public void seekPartitionToDate(KafkaConsumer consumer, TopicPartition partition) { --- End diff -- But from here I can understand why. Ideally, this method signature should really be `seekPartitionToDate(KafkaConsumer, TopicParitition, Date)`, but that would require the startup date to be passed all the way to the `KafkaConsumerThread`. This also leads to the fact, which isn't nice, that the `KafkaConsumerThread` lives within the Kafka 0.9 module, while 0.9 doesn't support timestamp-based offsets ... ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124977924 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { --- End diff -- In fact we need to override this in 0.10 here. `FlinkKafkaConsumer010` extends from `FlinkKafkaConsumer09`, and `Exception` will be thrown in `setStartFromSpecificDate` of `FlinkKafkaConsumer09` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964821 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private Map convertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964762 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private Map convertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + --- End diff -- unnecessary empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124966020 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private Map convertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Of course, this would entail that we need to encode the timestamp into a `KafkaTopicPartitionStateSentinel`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124965318 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { --- End diff -- I don't think you need to override this in 0.10, right? The implementation is basically identical to the base implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964448 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- cool, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964813 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private Map convertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { --- End diff -- Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124966340 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -697,13 +738,19 @@ protected static void initializeSubscribedPartitionsToStartOffsets( int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode, + Date specificStartupDate, Map specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { - if (startupMode != StartupMode.SPECIFIC_OFFSETS) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); - } else { + if (startupMode == StartupMode.SPECIFIC_TIMESTAMP) { + if (specificStartupDate == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_TIMESTAMP + + ", but no specific timestamp were specified"); + } + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), specificStartupDate.getTime()); --- End diff -- This is the main problem: following the original design pattern, it would be better to place a `KafkaTopicPartitionStateSentinel` here instead of eagerly converting the `Date` to a specific offset. We only convert the date to specific offsets when we're about to start consuming the partition (i.e. in `KafkaConsumer` thread). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124965642 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private Map convertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- I think we need to move this conversion logic to `KafkaConsumerThread`, otherwise we would be instantiating a KafkaConsumer just for the sake of fetching timestamp-based offsets. That's where the actual "`KafkaTopicPartitionStateSentinel` to actual offset" conversions take place. See `KafkaConsumerThread` lines 369 - 390 ``` // offsets in the state of new partitions may still be placeholder sentinel values if we are: // (1) starting fresh, // (2) checkpoint / savepoint state we were restored with had not completely // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. for (KafkaTopicPartitionState newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerCallBridge.seekPartitionToBeginning(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerCallBridge.seekPartitionToEnd(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else { consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1); } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
GitHub user zjureel reopened a pull request: https://github.com/apache/flink/pull/3915 [FLINK-6352] Support to use timestamp to set the initial offset of kafka Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3915 commit 53eaea8e73ee704e0d344fee85a67286191c6bde Author: zjureel Date: 2017-06-23T08:16:49Z [FLINK-6499] FlinkKafkaConsumer should support to use timestamp to set up start offset --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/3915 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117206216 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d validateAutoOffsetResetValue(props); } + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Indeed, user may be doubt about the new method when he used Kafka version 0.8 and 0.9 both. New functionality backwards compatibility is a better experience, I think this method could be added when timestamp is supported both by version 0.8 and 0.9 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175607 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -171,6 +170,11 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + throw new RuntimeException("This method dose not support for version 0.8 of Kafka"); --- End diff -- Do you mean 0.9? Also, typo on "dose". I would also suggest to be more specific: "Starting from a specific date is not supported for Kafka version xx". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175760 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -311,12 +311,14 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or * savepoint, only the offsets in the restored state will be used. * -* Note: The api is supported by kafka version >= 0.10 only. -* * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { - throw new RuntimeException("This method supports kafka version >= 0.10 only."); + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); --- End diff -- must "be" before. Could you also add the errorneous time to the error message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175199 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d validateAutoOffsetResetValue(props); } + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Actually I think lets just disable the timestamp option for 0.8. I just think its a bit strange that the functionality is there for 0.8 and 0.10, but skipped for 0.10. Sorry for jumping back and forth here, trying to figure out what would be most natural. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117162289 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn dependency:tree to print the dependency information: +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile +- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675159 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. --- End diff -- the kafka --> just "Kafka", with K capitalized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674925 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -181,12 +181,6 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); - // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; - // this overwrites whatever setting the user configured in the properties - if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { - properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - } --- End diff -- This shouldn't be removed (I assume you accidentally removed it when rebasing?). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674594 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -53,6 +53,10 @@ under the License. org.apache.kafka kafka_${scala.binary.version} + + org.apache.kafka + kafka-clients + --- End diff -- Could you explain a bit why this is needed now? Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116676798 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. +* If there's no such message, the consumer will use the latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* Note: The api is supported by kafka version >= 0.10 only. +* +* @return The consumer object, to allow function chaining. +*/ + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + throw new RuntimeException("This method supports kafka version >= 0.10 only."); --- End diff -- If only 0.10 supports this, shouldn't we add it to the `FlinkKafkaConsumer010` class only? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675083 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. --- End diff -- "curr" --> "current" We usually avoid abbreviations like this in Javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675211 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. +* If there's no such message, the consumer will use the latest offset to read data from kafka. --- End diff -- "message" --> "offset" is the term used in Kafka --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674541 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- Just to be sure: were there any additional dependencies as a result to this bump? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3915: [FLINK-6352] Support to use timestamp to set the i...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3915 [FLINK-6352] Support to use timestamp to set the initial offset of kafka Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3915 commit e1f5aee8a471ef1f1e8cec3104807b22954b6a42 Author: zjureel Date: 2017-05-15T10:27:24Z [FLINK-6352] Support to use timestamp to set the initial offset of kafka commit 5d482c57ad19f0f9739fe5b40fe6e8713900e8a4 Author: zjureel Date: 2017-05-16T07:37:09Z fix StreamExecutionEnvironment test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---