[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70164370 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Oh I see, right :) Okay! Thanks for fixing it for me! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70164121 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Ah, sorry, its not about the method name. I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`. The problem is that the `assignTimestampsAndWatermarks()` returns a stream with assigned timestamps. So doing ```java kinesis.assignTimestampsAndWatermarks(); kinesis.timeWindow(); // <-- this time window won't get the watermarks ``` won't work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70161839 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- Thanks Robert. I'm not quite sure about the problem with using `assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, and from my understanding there's not much difference with `assignTimestamps()` except that `assignTimestamps()` is deprecated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70103527 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). + +Users can choose to override this default with a custom timestamp, as described [here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, +it can be passed to the consumer in the following way: + + + +{% highlight java %} +DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( +"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig)); +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner()); --- End diff -- There is one minor thing here, you have to do kinesis = kinesis.assignTS() in order to work properly. But I'll fix it while merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70063862 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). --- End diff -- Good point. Added! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059358 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect +* @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; *this index should be the returned value from *{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Ah, sorry, this is actually wrong. Should be using `sourceContext.collectWithTimestamp()`, missed this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70059282 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() { * This method is called by {@link ShardConsumer}s. * * @param record the record to collect +* @param recordTimestamp timestamp to attach to the collected record * @param shardStateIndex index of the shard to update in subscribedShardsState; *this index should be the returned value from *{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected void emitRecordAndUpdateState(T record, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collect(record); --- End diff -- Did you also consider passing the record to the `sourceContext.collectWithTimestamp()` method in addition to passing it to the serialization schema? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2214#discussion_r70058907 --- Diff: docs/apis/streaming/connectors/kinesis.md --- @@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if enough processing slots ar Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers. + Event Time for Consumed Records + + + +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} + + +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} + + + +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html) for record +timestamps, an *approximate arrival timestamp* will be used. This timestamp is attached to records by Kinesis once they +were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side +timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be +ascending). --- End diff -- Maybe we can also add a sentence saying that users can override the timestamp if they want to extract their own event-time timestamp. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2214 [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp This Kinesis-provided timestamp is used in the following: 1) Exposed through the KinesisDeserializationSchema for users to obtain 2) Attached to records as the default event time timestamp You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4019 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2214 commit c71666f53dd666d11f85b0303b586cbb38175963 Author: Gordon TaiDate: 2016-07-08T00:46:31Z [FLINK-4019][kinesis-connector] Use Kinesis records' approximateArrivalTimestamp Used in the following: 1) Exposed through the KinesisDeserializationSchema for users to obtain 2) Attatched to records as the default event time --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---