[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2214


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70164370
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Oh I see, right :) Okay! Thanks for fixing it for me!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-09 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70164121
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Ah, sorry, its not about the method name. 
I meant `kinesis = kinesis.assignTimestampsAndWatermarks()`.
The problem is that the `assignTimestampsAndWatermarks()` returns a stream 
with assigned timestamps.
So doing
```java
kinesis.assignTimestampsAndWatermarks();
kinesis.timeWindow(); // <-- this time window won't get the watermarks
```
won't work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70161839
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

Thanks Robert.
I'm not quite sure about the problem with using 
`assignTimestampsAndWatermarks()` here, can you explain? I looked at the code, 
and from my understanding there's not much difference  with 
`assignTimestamps()` except that `assignTimestamps()` is deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70103527
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used by default. 
This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as 
described [here]({{ site.baseurl 
}}/apis/streaming/event_timestamps_watermarks.html),
+or use one from the [predefined ones]({{ site.baseurl 
}}/apis/streaming/event_timestamp_extractors.html). After doing so,
+it can be passed to the consumer in the following way:
+
+
+
+{% highlight java %}
+DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
+"kinesis_stream_name", new SimpleStringSchema(), 
kinesisConsumerConfig));
+kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
--- End diff --

There is one minor thing here, you have to do kinesis = kinesis.assignTS() 
in order to work properly.
But I'll fix it while merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70063862
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used. This 
timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
--- End diff --

Good point. Added!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70059358
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
 * This method is called by {@link ShardConsumer}s.
 *
 * @param record the record to collect
+* @param recordTimestamp timestamp to attach to the collected record
 * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
 *this index should be the returned value from
 *{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --

Ah, sorry, this is actually wrong. Should be using 
`sourceContext.collectWithTimestamp()`, missed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70059282
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -491,13 +491,14 @@ protected Properties getConsumerConfiguration() {
 * This method is called by {@link ShardConsumer}s.
 *
 * @param record the record to collect
+* @param recordTimestamp timestamp to attach to the collected record
 * @param shardStateIndex index of the shard to update in 
subscribedShardsState;
 *this index should be the returned value from
 *{@link 
KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, 
called
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected void emitRecordAndUpdateState(T record, int shardStateIndex, 
SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collect(record);
--- End diff --

Did you also consider passing the record to the 
`sourceContext.collectWithTimestamp()` method in addition to passing it to the 
serialization schema?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2214#discussion_r70058907
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -146,6 +146,29 @@ Also note that Flink can only restart the topology if 
enough processing slots ar
 Therefore, if the topology fails due to loss of a TaskManager, there must 
still be enough slots available afterwards.
 Flink on YARN supports automatic restart of lost YARN containers.
 
+ Event Time for Consumed Records
+
+
+
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+
+
+
+If streaming topologies choose to use the [event time 
notion]({{site.baseurl}}/apis/streaming/event_time.html) for record
+timestamps, an *approximate arrival timestamp* will be used. This 
timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp 
is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order 
correctness (i.e., the timestamps may not always be
+ascending).
--- End diff --

Maybe we can also add a sentence saying that users can override the 
timestamp if they want to extract their own event-time timestamp.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2214: [FLINK-4019][kinesis-connector] Use Kinesis record...

2016-07-07 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2214

[FLINK-4019][kinesis-connector] Use Kinesis records' 
approximateArrivalTimestamp

This Kinesis-provided timestamp is used in the following:
1) Exposed through the KinesisDeserializationSchema for users to obtain
2) Attached to records as the default event time timestamp

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4019

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2214


commit c71666f53dd666d11f85b0303b586cbb38175963
Author: Gordon Tai 
Date:   2016-07-08T00:46:31Z

[FLINK-4019][kinesis-connector] Use Kinesis records' 
approximateArrivalTimestamp

Used in the following:
1) Exposed through the KinesisDeserializationSchema for users to obtain
2) Attatched to records as the default event time




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---