[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 5:46 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class as argument directly, if the user 
chooses to use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too as they can simply set 
configuration using setter methods if they use the configuration class directly.


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too as they can simply set 
configuration using setter methods if they use the configuration class directly.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 5:46 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too as they can simply set 
configuration using setter methods if they use the configuration class directly.


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too as they can simply set 
configuration using setter methods. But does having a typed configuration class 
mean that we don't need a class like {{KinesisConfigConstants}} to hold key 
names anymore? I feel the approach will somewhat be conflicting with my idea of 
keeping the {{Properties}} constructors, so we need to decide which way to go.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372160#comment-15372160
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 4:30 AM:
-

I think I'll open a PR for FLINK-4170 first, to get a feel of if this change is 
really necessary and decide whether or not we want to proceed with this.


was (Author: tzulitai):
I think I'll open a PR for FLINK-4170 first, to get a feel of if this change is 
really necessary and decide whether or not we should proceed with this.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372160#comment-15372160
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4195:


I think I'll open a PR for FLINK-4170 first, to get a feel if this change is 
really necessary and decide whether or not we should proceed with this.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372160#comment-15372160
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 4:30 AM:
-

I think I'll open a PR for FLINK-4170 first, to get a feel of if this change is 
really necessary and decide whether or not we should proceed with this.


was (Author: tzulitai):
I think I'll open a PR for FLINK-4170 first, to get a feel if this change is 
really necessary and decide whether or not we should proceed with this.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372134#comment-15372134
 ] 

ASF GitHub Bot commented on FLINK-4018:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2071
  
@rmetzger Will you have time to help merge this PR too? I think the changes 
are good to go.


> Configurable idle time between getRecords requests to Kinesis shards
> 
>
> Key: FLINK-4018
> URL: https://issues.apache.org/jira/browse/FLINK-4018
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer is calling getRecords() right after finishing 
> previous calls. This results in easily reaching Amazon's limitation of 5 GET 
> requests per shard per second. Although the code already has backoff & retry 
> mechanism, this will affect other applications consuming from the same 
> Kinesis stream.
> Along with this new configuration and the already existing 
> `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
> control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2071: [FLINK-4018][streaming-connectors] Configurable idle time...

2016-07-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2071
  
@rmetzger Will you have time to help merge this PR too? I think the changes 
are good to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:35 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too as they can simply set 
configuration using setter methods. But does having a typed configuration class 
mean that we don't need a class like {{KinesisConfigConstants}} to hold key 
names anymore? I feel the approach will somewhat be conflicting with my idea of 
keeping the {{Properties}} constructors, so we need to decide which way to go.


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too. But does having a typed 
configuration class mean that we don't need a class like 
{{KinesisConfigConstants}} to hold key names anymore? I feel the approach will 
somewhat be conflicting with my idea of keeping the {{Properties}} 
constructors, so we need to decide which way to go.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:35 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too. But does having a typed 
configuration class mean that we don't need a class like 
{{KinesisConfigConstants}} to hold key names anymore? I feel the approach will 
somewhat be conflicting with my idea of keeping the {{Properties}} 
constructors, so we need to decide which way to go.


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too. But does having a typed 
configuration class mean that we don't need a class like 
{{KinesisConfigConstants}} to hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:32 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too. But does having a typed 
configuration class mean that we don't need a class like 
{{KinesisConfigConstants}} to hold key names anymore?


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372127#comment-15372127
 ] 

ASF GitHub Bot commented on FLINK-4197:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2227
  
Thank you for opening a PR for this @skidder ! I'll review the changes soon.


> Allow Kinesis Endpoint to be Overridden via Config
> --
>
> Key: FLINK-4197
> URL: https://issues.apache.org/jira/browse/FLINK-4197
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.0.3
>Reporter: Scott Kidder
>Priority: Minor
>  Labels: easyfix
> Fix For: 1.0.4
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> I perform local testing of my application stack with Flink configured as a 
> consumer on a Kinesis stream provided by Kinesalite, an implementation of 
> Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
> refer to my local Kinesalite server rather than reference the real AWS 
> endpoint. I'd like to add a configuration property to the Kinesis streaming 
> connector that allows the AWS endpoint to be specified explicitly.
> This should be a fairly small change and provide a lot of flexibility to 
> people looking to integrate Flink with Kinesis in a non-production setup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2227: [FLINK-4197] Allow Kinesis endpoint to be overridden via ...

2016-07-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2227
  
Thank you for opening a PR for this @skidder ! I'll review the changes soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:25 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally? We use the given 
{{Properties}} to create the config class. We can also have a new constructor 
that takes the reworked configuration class directly, if the user chooses to 
use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:24 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle for the user too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:23 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like {{KinesisProducerConfiguration}} will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?


was (Author: tzulitai):
What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like `KinesisProducerConfiguration` will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4195:


What about keeping the current constructors that take `Properties` config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like `KinesisProducerConfiguration` will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372124#comment-15372124
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4195 at 7/12/16 3:23 AM:
-

What about keeping the current constructors that take {{Properties}} config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like `KinesisProducerConfiguration` will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?


was (Author: tzulitai):
What about keeping the current constructors that take `Properties` config as 
argument, but use the reworked configuration class internally?
We can also have a constructor that takes the reworked configuration class 
directly, if the user chooses to use that instead.

A typed configuration class like `KinesisProducerConfiguration` will be nice, 
and probably easier to handle, too.
But does having a typed configuration class mean that we don't need a class to 
hold key names anymore?

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372084#comment-15372084
 ] 

ASF GitHub Bot commented on FLINK-4197:
---

GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/2227

[FLINK-4197] Allow Kinesis endpoint to be overridden via config

I perform local testing of my application stack with Flink configured as a 
consumer on a Kinesis stream provided by Kinesalite, an implementation of 
Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
refer to my local Kinesalite server rather than reference the real AWS 
endpoint. I'd like to add a configuration property to the Kinesis streaming 
connector that allows the AWS endpoint to be specified explicitly.

This should be a fairly small change and provide a lot of flexibility to 
people looking to integrate Flink with Kinesis in a non-production setup.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink kinesis-url-override

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2227


commit 0356141fbd826553cc206ac19dc4cfd734668c97
Author: Scott Kidder 
Date:   2016-07-12T02:20:55Z

[FLINK-4197] Allow Kinesis endpoint to be overridden via config




> Allow Kinesis Endpoint to be Overridden via Config
> --
>
> Key: FLINK-4197
> URL: https://issues.apache.org/jira/browse/FLINK-4197
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.0.3
>Reporter: Scott Kidder
>Priority: Minor
>  Labels: easyfix
> Fix For: 1.0.4
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> I perform local testing of my application stack with Flink configured as a 
> consumer on a Kinesis stream provided by Kinesalite, an implementation of 
> Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
> refer to my local Kinesalite server rather than reference the real AWS 
> endpoint. I'd like to add a configuration property to the Kinesis streaming 
> connector that allows the AWS endpoint to be specified explicitly.
> This should be a fairly small change and provide a lot of flexibility to 
> people looking to integrate Flink with Kinesis in a non-production setup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2227: [FLINK-4197] Allow Kinesis endpoint to be overridd...

2016-07-11 Thread skidder
GitHub user skidder opened a pull request:

https://github.com/apache/flink/pull/2227

[FLINK-4197] Allow Kinesis endpoint to be overridden via config

I perform local testing of my application stack with Flink configured as a 
consumer on a Kinesis stream provided by Kinesalite, an implementation of 
Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
refer to my local Kinesalite server rather than reference the real AWS 
endpoint. I'd like to add a configuration property to the Kinesis streaming 
connector that allows the AWS endpoint to be specified explicitly.

This should be a fairly small change and provide a lot of flexibility to 
people looking to integrate Flink with Kinesis in a non-production setup.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skidder/flink kinesis-url-override

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2227


commit 0356141fbd826553cc206ac19dc4cfd734668c97
Author: Scott Kidder 
Date:   2016-07-12T02:20:55Z

[FLINK-4197] Allow Kinesis endpoint to be overridden via config




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4197) Allow Kinesis Endpoint to be Overridden via Config

2016-07-11 Thread Scott Kidder (JIRA)
Scott Kidder created FLINK-4197:
---

 Summary: Allow Kinesis Endpoint to be Overridden via Config
 Key: FLINK-4197
 URL: https://issues.apache.org/jira/browse/FLINK-4197
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.0.3
Reporter: Scott Kidder
Priority: Minor
 Fix For: 1.0.4


I perform local testing of my application stack with Flink configured as a 
consumer on a Kinesis stream provided by Kinesalite, an implementation of 
Kinesis built on LevelDB. This requires me to override the AWS endpoint to 
refer to my local Kinesalite server rather than reference the real AWS 
endpoint. I'd like to add a configuration property to the Kinesis streaming 
connector that allows the AWS endpoint to be specified explicitly.

This should be a fairly small change and provide a lot of flexibility to people 
looking to integrate Flink with Kinesis in a non-production setup.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371842#comment-15371842
 ] 

ASF GitHub Bot commented on FLINK-1502:
---

Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/1947
  
@zentol is there any HTTP interface to these metrics that can be used to 
query the metrics? Something similar to existing JobManager Accumulators urls?


> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1947: [FLINK-1502] [WIP] Basic Metric System

2016-07-11 Thread sumitchawla
Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/1947
  
@zentol is there any HTTP interface to these metrics that can be used to 
query the metrics? Something similar to existing JobManager Accumulators urls?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-3190) Retry rate limits for DataStream API

2016-07-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-3190.
--
Resolution: Implemented

Added via a7274d5661421a66dcb998efd72b547301e1dc0f

> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1501) Integrate metrics library and report basic metrics to JobManager web interface

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371727#comment-15371727
 ] 

ASF GitHub Bot commented on FLINK-1501:
---

Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/421
  
@rmetzger how can i view these metrics? Do i need JMX to be enabled for 
viewing these metrics?  As of now i see only a few numbers in metrics tab for 
TaskManager


> Integrate metrics library and report basic metrics to JobManager web interface
> --
>
> Key: FLINK-1501
> URL: https://issues.apache.org/jira/browse/FLINK-1501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 0.9
>
>
> As per mailing list, the library: https://github.com/dropwizard/metrics
> The goal of this task is to get the basic infrastructure in place.
> Subsequent issues will integrate more features into the system.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371729#comment-15371729
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1954


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

2016-07-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1954


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #421: [FLINK-1501] Add metrics library for monitoring TaskManage...

2016-07-11 Thread sumitchawla
Github user sumitchawla commented on the issue:

https://github.com/apache/flink/pull/421
  
@rmetzger how can i view these metrics? Do i need JMX to be enabled for 
viewing these metrics?  As of now i see only a few numbers in metrics tab for 
TaskManager


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371524#comment-15371524
 ] 

Gyula Fora commented on FLINK-4193:
---

It happened multiple times in a row on the same cluster but on different 
machines. But it does not occur always. The total parallelism that we were 
deploying was about 200. 

Basically what we did was to run a script that deploys 5 jobs one ofter the 
other with 10 sec in between. It usually crashed somewhere in the middle.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371519#comment-15371519
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2225
  
I fixed the remaining comments. I'll merge the changes tomorrow.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2225: [FLINK-4191] Expose shard information in kinesis deserial...

2016-07-11 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2225
  
I fixed the remaining comments. I'll merge the changes tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70327009
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

Fixed the indention.
I don't think that having a separate final variable for this will improve 
the performance a lot. The methods are computing anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371512#comment-15371512
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70327009
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

Fixed the indention.
I don't think that having a separate final variable for this will improve 
the performance a lot. The methods are computing anything.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371503#comment-15371503
 ] 

Stephan Ewen commented on FLINK-4193:
-

Thanks. Does it occur rarely, or can you more or less reproduce it with a 
certain setup?

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371496#comment-15371496
 ] 

Robert Metzger commented on FLINK-4195:
---

If we are going to rework the configuration, I wonder if it makes sense to use 
a typed configuration class, similar to Amazon's 
{{KinesisProducerConfiguration}} class.
The reason why I chose to use Properties was to have a similar configuration 
pattern with the Kafka connector.
Also, its easier to parse configuration from the command line into properties.
But I'm open to discuss this.

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3515) Make the "file monitoring source" exactly-once

2016-07-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-3515.
---
Resolution: Duplicate

> Make the "file monitoring source" exactly-once
> --
>
> Key: FLINK-3515
> URL: https://issues.apache.org/jira/browse/FLINK-3515
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.2
>Reporter: Stephan Ewen
>
> The stream source that watches directories for changes is currently not 
> "exactly-once".
> To make it exactly once, the source (that generates files to be read) and the 
> flatMap (that reads the files) need to keep track of where they were at the 
> point of a checkpoint.
> Assuming that files do not change after creation (HDFS / S3 style), we can 
> make this the following way:
>   - The source can track the files it already emitted downstream via file 
> creation/modification timestamp, assuming that new files always get newer 
> timestamps.
>   - The flatMappers need to always store the path of their current file 
> fragment, plus the byte offset where they were within that file split.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4196) Remove "recoveryTimestamp"

2016-07-11 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371468#comment-15371468
 ] 

Stephan Ewen edited comment on FLINK-4196 at 7/11/16 7:31 PM:
--

None of Flink's pre-packaged state backends is currently using this.


was (Author: stephanewen):
None of Flink's pre-packages state backends is currently using this.

> Remove "recoveryTimestamp"
> --
>
> Key: FLINK-4196
> URL: https://issues.apache.org/jira/browse/FLINK-4196
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>
> I think we should remove the {{recoveryTimestamp}} that is attached on state 
> restore calls.
> Given that this is a wall clock timestamp from a master node, which may 
> change when clocks are adjusted, and between different master nodes during 
> leader change, this is an unsafe concept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4196) Remove "recoveryTimestamp"

2016-07-11 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371468#comment-15371468
 ] 

Stephan Ewen commented on FLINK-4196:
-

None of Flink's pre-packages state backends is currently using this.

> Remove "recoveryTimestamp"
> --
>
> Key: FLINK-4196
> URL: https://issues.apache.org/jira/browse/FLINK-4196
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>
> I think we should remove the {{recoveryTimestamp}} that is attached on state 
> restore calls.
> Given that this is a wall clock timestamp from a master node, which may 
> change when clocks are adjusted, and between different master nodes during 
> leader change, this is an unsafe concept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4196) Remove "recoveryTimestamp"

2016-07-11 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4196:
---

 Summary: Remove "recoveryTimestamp"
 Key: FLINK-4196
 URL: https://issues.apache.org/jira/browse/FLINK-4196
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.0.3
Reporter: Stephan Ewen


I think we should remove the {{recoveryTimestamp}} that is attached on state 
restore calls.

Given that this is a wall clock timestamp from a master node, which may change 
when clocks are adjusted, and between different master nodes during leader 
change, this is an unsafe concept.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3085) Move State Backend Initialization from "registerInputOutput()" to "invoke()"

2016-07-11 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3085.
---

> Move State Backend Initialization from "registerInputOutput()" to "invoke()"
> 
>
> Key: FLINK-3085
> URL: https://issues.apache.org/jira/browse/FLINK-3085
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> The state backend initialization currently happens in the {{StreamTask}} in 
> {{registerInputOutput()}}. For better error handling, it should be part of 
> {{invoke()}}, where the task is properly interrupted, threads are properly 
> joined, and exceptions are treated aware of cancelling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3085) Move State Backend Initialization from "registerInputOutput()" to "invoke()"

2016-07-11 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3085.
-
   Resolution: Invalid
Fix Version/s: (was: 1.0.0)

There is no distinction between {{registerInputOutput()}} and {{invoke()}} any 
more.

> Move State Backend Initialization from "registerInputOutput()" to "invoke()"
> 
>
> Key: FLINK-3085
> URL: https://issues.apache.org/jira/browse/FLINK-3085
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> The state backend initialization currently happens in the {{StreamTask}} in 
> {{registerInputOutput()}}. For better error handling, it should be part of 
> {{invoke()}}, where the task is properly interrupted, threads are properly 
> joined, and exceptions are treated aware of cancelling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4195:
---
Fix Version/s: 1.1.0

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3466) Job might get stuck in restoreState() from HDFS due to interrupt

2016-07-11 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-3466:
---

Assignee: Stephan Ewen

> Job might get stuck in restoreState() from HDFS due to interrupt
> 
>
> Key: FLINK-3466
> URL: https://issues.apache.org/jira/browse/FLINK-3466
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.0, 0.10.2
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>
> A user reported the following issue with a failing job:
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task  
>- Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> and 
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task  
>- Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783)
> java.lang.Throwable.(Throwable.java:250)
> java.lang.Exception.(Exception.java:54)
> java.lang.InterruptedException.(InterruptedException.java:57)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> 

[jira] [Updated] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4195:
---
Component/s: Kinesis Connector

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4195:
---
Affects Version/s: 1.1.0

> Dedicated Configuration classes for Kinesis Consumer / Producer
> ---
>
> Key: FLINK-4195
> URL: https://issues.apache.org/jira/browse/FLINK-4195
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> While fixing FLINK-4170, I feel that configuration and default value setting 
> & validation is quite messy and unconsolidated for the current state of the 
> code, and will likely become worse as more configs grow for the Kinesis 
> connector.
> I propose to have a dedicated configuration class (instead of only Java 
> properties) along the lines of Flink's own {{Configuration}}, so that the 
> usage pattern is alike. There will be separate configuration classes for 
> {{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.
> [~uce] [~rmetzger] What do you think? This will break the interface, so if 
> we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 
> 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4195) Dedicated Configuration classes for Kinesis Consumer / Producer

2016-07-11 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4195:
--

 Summary: Dedicated Configuration classes for Kinesis Consumer / 
Producer
 Key: FLINK-4195
 URL: https://issues.apache.org/jira/browse/FLINK-4195
 Project: Flink
  Issue Type: Sub-task
Reporter: Tzu-Li (Gordon) Tai


While fixing FLINK-4170, I feel that configuration and default value setting & 
validation is quite messy and unconsolidated for the current state of the code, 
and will likely become worse as more configs grow for the Kinesis connector.

I propose to have a dedicated configuration class (instead of only Java 
properties) along the lines of Flink's own {{Configuration}}, so that the usage 
pattern is alike. There will be separate configuration classes for 
{{FlinkKinesisConsumerConfig}} and {{FlinkKinesisProducerConfig}}.

[~uce] [~rmetzger] What do you think? This will break the interface, so if 
we're to change this, I'd prefer to fix it along with FLINK-4170 for Flink 1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371342#comment-15371342
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

I'm very happy you're working on this! What I would suggest is to leave the 
existing {{RollingSink}} as is and deprecate it while adding the new sink under 
the {{BucktingSink}} name. I don't like breaking code of people that are 
already using the current sink and we can remove it once Flink 2.0 is released.

What do you think?

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371330#comment-15371330
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70308410
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

`subscribedShard.getStreamName()` and 
`subscribedShard.getShard().getShardId()` will be the same for all records 
collected in the while loop. Probably won't have a big performance difference, 
but perhaps it would be better to have `final` values for these two to point 
out this fact?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70308410
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

`subscribedShard.getStreamName()` and 
`subscribedShard.getShard().getShardId()` will be the same for all records 
collected in the while loop. Probably won't have a big performance difference, 
but perhaps it would be better to have `final` values for these two to point 
out this fact?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70305936
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -52,5 +54,5 @@
 * @param nextElement the element to test for the end-of-stream signal
 * @return true if the element signals end of stream, false otherwise
 */
-   boolean isEndOfStream(T nextElement);
+   // TODO ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
--- End diff --

Ah, I just noticed the JIRA for this. Let's leave it as it is then, and 
uncomment it for the fix on that JIRA.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371289#comment-15371289
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70305936
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -52,5 +54,5 @@
 * @param nextElement the element to test for the end-of-stream signal
 * @return true if the element signals end of stream, false otherwise
 */
-   boolean isEndOfStream(T nextElement);
+   // TODO ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
--- End diff --

Ah, I just noticed the JIRA for this. Let's leave it as it is then, and 
uncomment it for the fix on that JIRA.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371284#comment-15371284
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70305142
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -52,5 +54,5 @@
 * @param nextElement the element to test for the end-of-stream signal
 * @return true if the element signals end of stream, false otherwise
 */
-   boolean isEndOfStream(T nextElement);
+   // TODO ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
--- End diff --

Not sure if we should have commented out code. Open up a JIRA for this 
instead?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70305142
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -52,5 +54,5 @@
 * @param nextElement the element to test for the end-of-stream signal
 * @return true if the element signals end of stream, false otherwise
 */
-   boolean isEndOfStream(T nextElement);
+   // TODO ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
--- End diff --

Not sure if we should have commented out code. Open up a JIRA for this 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70304158
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
 ---
@@ -37,15 +37,15 @@ public 
KinesisDeserializationSchemaWrapper(DeserializationSchema deserializat
}
 
@Override
-   public T deserialize(byte[] recordKey, byte[] recordValue, String 
stream, String seqNum, long approxArrivalTimestamp)
+   public T deserialize(byte[] recordKey, byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId)
throws IOException {
return deserializationSchema.deserialize(recordValue);
}
 
-   @Override
+   /*@Override
public boolean isEndOfStream(T nextElement) {
return deserializationSchema.isEndOfStream(nextElement);
-   }
+   } */
--- End diff --

Not sure if we should have commented out code. Open up a JIRA for this 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371278#comment-15371278
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70304158
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
 ---
@@ -37,15 +37,15 @@ public 
KinesisDeserializationSchemaWrapper(DeserializationSchema deserializat
}
 
@Override
-   public T deserialize(byte[] recordKey, byte[] recordValue, String 
stream, String seqNum, long approxArrivalTimestamp)
+   public T deserialize(byte[] recordKey, byte[] recordValue, String 
partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String 
shardId)
throws IOException {
return deserializationSchema.deserialize(recordValue);
}
 
-   @Override
+   /*@Override
public boolean isEndOfStream(T nextElement) {
return deserializationSchema.isEndOfStream(nextElement);
-   }
+   } */
--- End diff --

Not sure if we should have commented out code. Open up a JIRA for this 
instead?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2225: [FLINK-4191] Expose shard information in kinesis deserial...

2016-07-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
Good points on not using Amazon's `Shard` and `Record`, did not think of 
that!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371279#comment-15371279
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
Good points on not using Amazon's `Shard` and `Record`, did not think of 
that!



> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70303482
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -37,13 +37,15 @@
 *
 * @param recordKey the records's key as a byte array (null if no key 
has been set for the record)
 * @param recordValue the record's value as a byte array
-* @param stream the name of the Kinesis stream that this record was 
sent to
+* @param partitionKey the record's partition key at the time of 
writing.
 * @param seqNum the sequence number of this record in the Kinesis shard
 * @param approxArrivalTimestamp the server-side timestamp of when 
Kinesis received and stored the record
+* @param stream the name of the Kinesis stream that this record was 
sent to
+* @param shardId The identifier of the shard the record was sent to.
--- End diff --

nit: some param descriptions end with period, some don't. I think the 
others all don't end with period, let's keep consistent with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371269#comment-15371269
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70303482
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -37,13 +37,15 @@
 *
 * @param recordKey the records's key as a byte array (null if no key 
has been set for the record)
 * @param recordValue the record's value as a byte array
-* @param stream the name of the Kinesis stream that this record was 
sent to
+* @param partitionKey the record's partition key at the time of 
writing.
 * @param seqNum the sequence number of this record in the Kinesis shard
 * @param approxArrivalTimestamp the server-side timestamp of when 
Kinesis received and stored the record
+* @param stream the name of the Kinesis stream that this record was 
sent to
+* @param shardId The identifier of the shard the record was sent to.
--- End diff --

nit: some param descriptions end with period, some don't. I think the 
others all don't end with period, let's keep consistent with that.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371264#comment-15371264
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70303112
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

Are these supposed to be indented with two tabs?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70303112
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -206,7 +206,13 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
final long approxArrivalTimestamp = 
record.getApproximateArrivalTimestamp().getTime();
 
final T value = deserializer.deserialize(
-   keyBytes, dataBytes, subscribedShard.getStreamName(), 
record.getSequenceNumber(), approxArrivalTimestamp);
+   keyBytes,
+   dataBytes,
+   record.getPartitionKey(),
+   record.getSequenceNumber(),
+   approxArrivalTimestamp,
+   subscribedShard.getStreamName(),
+   subscribedShard.getShard().getShardId());
--- End diff --

Are these supposed to be indented with two tabs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-07-11 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371216#comment-15371216
 ] 

Sebastian Klemke commented on FLINK-4015:
-

For our use case, buffering the in-flight records and replaying the whole 
buffer in order from first failed message would be optimal.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371204#comment-15371204
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2225
  
I added the javadoc and did some other changes:
- Commented out the `isEndOfStream()` method, because its not respected.
- Re-ordered the arguments of the deser schema. The first group is record 
specific, the second one stream/shard specific.
I also removed the `Shard` from the list of arguments again. If we are 
going to shade kinesis at some point, we'll need to break the interface. Also, 
we are relying on amazon not to change the class. I think now we are on the 
save side.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2225: [FLINK-4191] Expose shard information in kinesis deserial...

2016-07-11 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2225
  
I added the javadoc and did some other changes:
- Commented out the `isEndOfStream()` method, because its not respected.
- Re-ordered the arguments of the deser schema. The first group is record 
specific, the second one stream/shard specific.
I also removed the `Shard` from the list of arguments again. If we are 
going to shade kinesis at some point, we'll need to break the interface. Also, 
we are relying on amazon not to change the class. I think now we are on the 
save side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called

2016-07-11 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4194:
-

 Summary: KinesisDeserializationSchema.isEndOfStream() is never 
called
 Key: FLINK-4194
 URL: https://issues.apache.org/jira/browse/FLINK-4194
 Project: Flink
  Issue Type: Sub-task
  Components: Kinesis Connector
Affects Versions: 1.1.0
Reporter: Robert Metzger


The Kinesis connector does not respect the 
{{KinesisDeserializationSchema.isEndOfStream()}} method.

The purpose of this method is to stop consuming from a source, based on input 
data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371134#comment-15371134
 ] 

Gyula Fora commented on FLINK-4193:
---

We are using a pretty recent version, just a couple of days old. The server is 
at commit 3ab9e36 (jul 7)

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371139#comment-15371139
 ] 

Gyula Fora commented on FLINK-4193:
---

I know that rocks is bundled within the application jar, but I have also built 
that yesterday or two days ago, and assuming that maven has a pretty much 
up-to-date snapshot version this should not be a problem.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4159) Quickstart poms exclude unused dependencies

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371035#comment-15371035
 ] 

ASF GitHub Bot commented on FLINK-4159:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2217
  
+1 to merge


> Quickstart poms exclude unused dependencies
> ---
>
> Key: FLINK-4159
> URL: https://issues.apache.org/jira/browse/FLINK-4159
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.1.0
>
>
> The Quickstart poms exclude several dependencies from being packaged into the 
> fat-jar, even though they aren't used by Flink according to `mvn 
> dependency:tree`.
> com.amazonaws:aws-java-sdk
> com.twitter:chill-avro_*
> com.twitter:chill-bijection_*
> com.twitter:bijection-core_*
> com.twitter:bijection-avro_*
> de.javakaffee:kryo-serializers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2217: [FLINK-4159] Remove Quickstart exclusions for unused depe...

2016-07-11 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2217
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371021#comment-15371021
 ] 

Stephan Ewen commented on FLINK-4193:
-

On which version are you? I remember that [~aljoscha] fixed some concurrency 
issue a while back. The issue was that the RocksDB code still used native 
handles after disposal.

> Task manager JVM crashes while deploying cancelling jobs
> 
>
> Key: FLINK-4193
> URL: https://issues.apache.org/jira/browse/FLINK-4193
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, TaskManager
>Reporter: Gyula Fora
>Priority: Critical
>
> We have observed several TM crashes while deploying larger stateful streaming 
> jobs that use the RocksDB state backend.
> As the JVMs crash the logs don't show anything but I have uploaded all the 
> info I have got from the standard output.
> This indicates some GC and possibly some RocksDB issues underneath but we 
> could not really figure out much more.
> GC segfault
> https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125
> Other crashes (maybe rocks related)
> https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
> https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818
> The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4154) Correction of murmur hash breaks backwards compatibility

2016-07-11 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-4154.
-
Resolution: Fixed

Fixed in 81cf2296683a473db4061dd3bed0aeb249e05058

> Correction of murmur hash breaks backwards compatibility
> 
>
> Key: FLINK-4154
> URL: https://issues.apache.org/jira/browse/FLINK-4154
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's 
> backwards compatibility with respect to savepoints. The reason is that the 
> changed murmur hash which is used to partition elements in a {{KeyedStream}} 
> changes the mapping from keys to sub tasks. This changes the assigned key 
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns 
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two 
> options to solve the problem:
> -  revert the changes, but then we don't know how the flawed murmur hash 
> performs
> - develop tooling to repartition state of old savepoints. This is probably 
> not trivial since a keyed stream can also contain non-partitioned state which 
> is not partitionable in all cases. And even if only partitioned state is 
> used, we would need some kind of special operator which can repartition the 
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus 
> unlikely to be done before the release 1.1. Therefore, as a workaround, I 
> think that we should revert the murmur hash changes.
> [1] 
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4154) Correction of murmur hash breaks backwards compatibility

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370961#comment-15370961
 ] 

ASF GitHub Bot commented on FLINK-4154:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2223


> Correction of murmur hash breaks backwards compatibility
> 
>
> Key: FLINK-4154
> URL: https://issues.apache.org/jira/browse/FLINK-4154
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
>Priority: Blocker
> Fix For: 1.1.0
>
>
> The correction of Flink's murmur hash with commit [1], breaks Flink's 
> backwards compatibility with respect to savepoints. The reason is that the 
> changed murmur hash which is used to partition elements in a {{KeyedStream}} 
> changes the mapping from keys to sub tasks. This changes the assigned key 
> spaces for a sub task. Consequently, an old savepoint (version 1.0) assigns 
> states with a different key space to the sub tasks.
> I think that this must be fixed for the upcoming 1.1 release. I see two 
> options to solve the problem:
> -  revert the changes, but then we don't know how the flawed murmur hash 
> performs
> - develop tooling to repartition state of old savepoints. This is probably 
> not trivial since a keyed stream can also contain non-partitioned state which 
> is not partitionable in all cases. And even if only partitioned state is 
> used, we would need some kind of special operator which can repartition the 
> state wrt the key.
> I think that the latter option requires some more thoughts and is thus 
> unlikely to be done before the release 1.1. Therefore, as a workaround, I 
> think that we should revert the murmur hash changes.
> [1] 
> https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2223: [FLINK-4154] [core] Correction of murmur hash brea...

2016-07-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2223


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems

2016-07-11 Thread Saliya Ekanayake (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370933#comment-15370933
 ] 

Saliya Ekanayake commented on FLINK-3163:
-

In the standalone cluster setup, doing this with start scripts would be a 
relatively easy. The slaves file can support a format like, "IP N 
TM-bind-to-resource Slot-bind-to-resource", where N is the number of TMs to 
spawn in the particular host. I haven't looked into the workings of slots, but 
TMs are JVM processes, so it's possible to prefix the start command with 
numactl or taskset for pinning.

> Configure Flink for NUMA systems
> 
>
> Key: FLINK-3163
> URL: https://issues.apache.org/jira/browse/FLINK-3163
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> On NUMA systems Flink can be pinned to a single physical processor ("node") 
> using {{numactl --membind=$node --cpunodebind=$node }}. Commonly 
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could 
> configure a single TaskManager with 36 slots or have Flink create two 
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between 
> TaskManagers on the same system, though the fraction of data shuffled in this 
> manner decreases with the size of the cluster. The performance improvement 
> from only accessing local memory looks to be significant though difficult to 
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370932#comment-15370932
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
Btw, should we also be adding the info from `Record#getPartitionKey()` to 
the deserialization schema too? Might be off the scope of this issue, but might 
as well now that we're on to this?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2225: [FLINK-4191] Expose shard information in kinesis deserial...

2016-07-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
Btw, should we also be adding the info from `Record#getPartitionKey()` to 
the deserialization schema too? Might be off the scope of this issue, but might 
as well now that we're on to this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370918#comment-15370918
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
LGTM, +1 to merge the change after travis turns green + minor Javadoc 
update.


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370921#comment-15370921
 ] 

Josh Forman-Gornall edited comment on FLINK-4190 at 7/11/16 2:58 PM:
-

I needed this feature and already have a working solution - the changes I made 
can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will 
submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic 
parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation 
(e.g. rolling when the file size reaches a threshold), the sink implementation 
can now have many 'active' buckets at any point in time. The checkpointing 
mechanics have been extended to support maintaining the state of multiple 
active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the 
sink now needs to determine when a bucket has become 'inactive', in order to 
flush and close the file. In the existing implementation, this is simply when 
the bucket path changes. Instead, we now determine a bucket as inactive if it 
hasn't been written to recently. To support this there are two additional user 
configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome!



was (Author: joshfg):
I needed this feature and already have a working solution - the changes I made 
can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will 
submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic 
parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation 
(e.g. rolling when the file size reaches a threshold), the sink implementation 
can now have many 'active' buckets at any point in time. The checkpointing 
mechanics have been extended to support maintaining the state of multiple 
active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the 
sink now needs to determine when a bucket has become 'inactive', in order to 
flush and close the file. In the existing implementation, this is simply when 
the bucket path changes. Instead, we now determine a bucket as inactive if it 
hasn't been written to recently. To support this there are two additional user 
configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome, thanks!


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2225: [FLINK-4191] Expose shard information in kinesis deserial...

2016-07-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2225
  
LGTM, +1 to merge the change after travis turns green + minor Javadoc 
update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370921#comment-15370921
 ] 

Josh Forman-Gornall commented on FLINK-4190:


I needed this feature and already have a working solution - the changes I made 
can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will 
submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic 
parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation 
(e.g. rolling when the file size reaches a threshold), the sink implementation 
can now have many 'active' buckets at any point in time. The checkpointing 
mechanics have been extended to support maintaining the state of multiple 
active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the 
sink now needs to determine when a bucket has become 'inactive', in order to 
flush and close the file. In the existing implementation, this is simply when 
the bucket path changes. Instead, we now determine a bucket as inactive if it 
hasn't been written to recently. To support this there are two additional user 
configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome, thanks!


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Josh Forman-Gornall (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Forman-Gornall updated FLINK-4190:
---
Comment: was deleted

(was: I needed this feature and already have a working solution - will submit a 
PR today (comments welcome!))

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370889#comment-15370889
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70269570
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -43,7 +44,7 @@
 * @return the deserialized message as an Java object
 * @throws IOException
 */
-   T deserialize(byte[] recordKey, byte[] recordValue, String stream, 
String seqNum, long approxArrivalTimestamp) throws IOException;
+   T deserialize(byte[] recordKey, byte[] recordValue, String stream, 
Shard shard, String seqNum, long approxArrivalTimestamp) throws IOException;
--- End diff --

Can you also update the Javadoc for this method?


> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2225#discussion_r70269570
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
 ---
@@ -43,7 +44,7 @@
 * @return the deserialized message as an Java object
 * @throws IOException
 */
-   T deserialize(byte[] recordKey, byte[] recordValue, String stream, 
String seqNum, long approxArrivalTimestamp) throws IOException;
+   T deserialize(byte[] recordKey, byte[] recordValue, String stream, 
Shard shard, String seqNum, long approxArrivalTimestamp) throws IOException;
--- End diff --

Can you also update the Javadoc for this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4192) Move Metrics API to separate module

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370885#comment-15370885
 ] 

ASF GitHub Bot commented on FLINK-4192:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2226
  
Note that this PR will cause a few issues with other metric related Pull 
Requests.


> Move Metrics API to separate module
> ---
>
> Key: FLINK-4192
> URL: https://issues.apache.org/jira/browse/FLINK-4192
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> All metrics code currently resides in flink-core. If a user implements a 
> reporter and wants a fat jar it will now have to include the entire 
> flink-core module.
> Instead, we could move several interfaces into a separate module.
> These interfaces to move include:
> * Counter, Gauge, Histogram(Statistics)
> * MetricGroup
> * MetricReporter, Scheduled, AbstractReporter
> In addition a new MetricRegistry interface will be required as well as a 
> replacement for the Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2226: [FLINK-4192] - Move Metrics API to separate module

2016-07-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2226
  
Note that this PR will cause a few issues with other metric related Pull 
Requests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370883#comment-15370883
 ] 

ASF GitHub Bot commented on FLINK-3943:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2169
  
@twalthr Thank you for accepting my changes!


> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

2016-07-11 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2169
  
@twalthr Thank you for accepting my changes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4192) Move Metrics API to separate module

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370881#comment-15370881
 ] 

ASF GitHub Bot commented on FLINK-4192:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2226

[FLINK-4192] - Move Metrics API to separate module

This PR moves several metrics-related classes into a new module.

The new module is located under `flink-metrics/flink-metrics-core`.

Most of the changes are related to
* renaming of `MetricRegistry` to `InternalMetricRegistry`
 * a new `MetricRegistry` interface was added
* replacing usages of `Configuration` with a new `MetricConfig`

Changes unrelated to the above are as follows:
* obviously, creating a new module and adjusting dependencies
* some tests were refactored to properly close the registry
* `getScopeComponents/-String()` was moved from `AbstractMetricGroup` to 
`MetricGroup`
* fixed a small typo in `Scheduled`  interface

I have verified that it compiles and works for example.The tests in 
flink-core pass as well, I'll leave the rest to travis.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink metrics_4th_of_july

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2226.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2226






> Move Metrics API to separate module
> ---
>
> Key: FLINK-4192
> URL: https://issues.apache.org/jira/browse/FLINK-4192
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> All metrics code currently resides in flink-core. If a user implements a 
> reporter and wants a fat jar it will now have to include the entire 
> flink-core module.
> Instead, we could move several interfaces into a separate module.
> These interfaces to move include:
> * Counter, Gauge, Histogram(Statistics)
> * MetricGroup
> * MetricReporter, Scheduled, AbstractReporter
> In addition a new MetricRegistry interface will be required as well as a 
> replacement for the Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2226: [FLINK-4192] - Move Metrics API to separate module

2016-07-11 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/2226

[FLINK-4192] - Move Metrics API to separate module

This PR moves several metrics-related classes into a new module.

The new module is located under `flink-metrics/flink-metrics-core`.

Most of the changes are related to
* renaming of `MetricRegistry` to `InternalMetricRegistry`
 * a new `MetricRegistry` interface was added
* replacing usages of `Configuration` with a new `MetricConfig`

Changes unrelated to the above are as follows:
* obviously, creating a new module and adjusting dependencies
* some tests were refactored to properly close the registry
* `getScopeComponents/-String()` was moved from `AbstractMetricGroup` to 
`MetricGroup`
* fixed a small typo in `Scheduled`  interface

I have verified that it compiles and works for example.The tests in 
flink-core pass as well, I'll leave the rest to travis.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink metrics_4th_of_july

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2226.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2226






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4193) Task manager JVM crashes while deploying cancelling jobs

2016-07-11 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4193:
-

 Summary: Task manager JVM crashes while deploying cancelling jobs
 Key: FLINK-4193
 URL: https://issues.apache.org/jira/browse/FLINK-4193
 Project: Flink
  Issue Type: Bug
  Components: Streaming, TaskManager
Reporter: Gyula Fora
Priority: Critical


We have observed several TM crashes while deploying larger stateful streaming 
jobs that use the RocksDB state backend.

As the JVMs crash the logs don't show anything but I have uploaded all the info 
I have got from the standard output.

This indicates some GC and possibly some RocksDB issues underneath but we could 
not really figure out much more.

GC segfault
https://gist.github.com/gyfora/9e56d4a0d4fc285a8d838e1b281ae125

Other crashes (maybe rocks related)
https://gist.github.com/gyfora/525c67c747873f0ff2ff2ed1682efefa
https://gist.github.com/gyfora/b93611fde87b1f2516eeaf6bfbe8d818

The third link shows 2 issues that happened in parallel...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4192) Move Metrics API to separate module

2016-07-11 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4192:
---

 Summary: Move Metrics API to separate module
 Key: FLINK-4192
 URL: https://issues.apache.org/jira/browse/FLINK-4192
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.1.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.1.0


All metrics code currently resides in flink-core. If a user implements a 
reporter and wants a fat jar it will now have to include the entire flink-core 
module.

Instead, we could move several interfaces into a separate module.

These interfaces to move include:
* Counter, Gauge, Histogram(Statistics)
* MetricGroup
* MetricReporter, Scheduled, AbstractReporter

In addition a new MetricRegistry interface will be required as well as a 
replacement for the Configuration.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2225: [FLINK-4191] Expose shard information in kinesis d...

2016-07-11 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2225

[FLINK-4191] Expose shard information in kinesis deserialization schema

@tzulitai please review the change.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4191

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2225.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2225


commit ed0053f41dca5d1bcd61cbb30a4c1c0750f6ff9b
Author: Robert Metzger 
Date:   2016-07-11T12:54:41Z

[FLINK-4191] Expose shard information in kinesis deserialization schema




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370701#comment-15370701
 ] 

ASF GitHub Bot commented on FLINK-4191:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2225

[FLINK-4191] Expose shard information in kinesis deserialization schema

@tzulitai please review the change.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink4191

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2225.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2225


commit ed0053f41dca5d1bcd61cbb30a4c1c0750f6ff9b
Author: Robert Metzger 
Date:   2016-07-11T12:54:41Z

[FLINK-4191] Expose shard information in kinesis deserialization schema




> Expose shard information in KinesisDeserializationSchema
> 
>
> Key: FLINK-4191
> URL: https://issues.apache.org/jira/browse/FLINK-4191
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> Currently, we are not exposing the Shard ID and other shard-related 
> information in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4191) Expose shard information in KinesisDeserializationSchema

2016-07-11 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4191:
-

 Summary: Expose shard information in KinesisDeserializationSchema
 Key: FLINK-4191
 URL: https://issues.apache.org/jira/browse/FLINK-4191
 Project: Flink
  Issue Type: Sub-task
  Components: Kinesis Connector, Streaming Connectors
Affects Versions: 1.1.0
Reporter: Robert Metzger
Assignee: Robert Metzger
 Fix For: 1.1.0


Currently, we are not exposing the Shard ID and other shard-related information 
in the deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370686#comment-15370686
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/1954
  
The changes look good to me. Thanks a lot for your work @fijolekProjects 
:-) Will be merging your PR.


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

2016-07-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/1954
  
The changes look good to me. Thanks a lot for your work @fijolekProjects 
:-) Will be merging your PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-07-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70246597
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
+
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new DropWizardHistogramWrapper(histogram));
+  }
+}
+{% endhighlight %}
+
+## Scope
  

[jira] [Commented] (FLINK-4116) Document metrics

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370634#comment-15370634
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r70246597
  
--- Diff: docs/apis/metrics.md ---
@@ -0,0 +1,441 @@
+---
+title: "Metrics"
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 13
+top-nav-title: "Metrics"
+---
+
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+ Counter
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a `MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCounter");
+  }
+
+  @public Integer map(String value) throws Exception {
+this.counter.inc();
+  }
+}
+
+{% endhighlight %}
+
+Alternatively you can also use your own `Counter` implementation:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+this.counter = getRuntimeContext()
+  .getMetricGroup()
+  .counter("myCustomCounter", new CustomCounter());
+  }
+}
+
+{% endhighlight %}
+
+ Gauge
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is no restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
`MetricGroup`.
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private int valueToExpose;
+
+  @Override
+  public void open(Configuration config) {
+getRuntimeContext()
+  .getMetricGroup()
+  .gauge("MyGauge", new Gauge() {
+@Override
+public Integer getValue() {
+  return valueToExpose;
+}
+  });
+  }
+}
+
+{% endhighlight %}
+
+Note that reporters will turn the exposed object into a `String`, which 
means that a meaningful `toString()` implementation is required.
+
+ Histogram
+
+A `Histogram` measures the distribution of long values.
+You can register one by calling `histogram(String name, Histogram 
histogram)` on a `MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+this.histogram = getRuntimeContext()
+  .getMetricGroup()
+  .histogram("myHistogram", new MyHistogram());
+  }
+
+  @public Integer map(Long value) throws Exception {
+this.histogram.update(value);
+  }
+}
+{% endhighlight %}
+
+Flink does not provide a default implementation for `Histogram`, but 
offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard histograms.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-metrics-dropwizard
+  {{site.version}}
+
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard histogram like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction {
+  private Histogram histogram;
+
+  @Override
+  public void open(Configuration config) {
+com.codahale.metrics.Histogram histogram =
+  new com.codahale.metrics.Histogram(new 

[jira] [Commented] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation

2016-07-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370616#comment-15370616
 ] 

ASF GitHub Bot commented on FLINK-4157:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2206


> FlinkKafkaMetrics cause TaskManager shutdown during cancellation
> 
>
> Key: FLINK-4157
> URL: https://issues.apache.org/jira/browse/FLINK-4157
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.1.0
>
>
> The following issue was reported by a user:
> {code}
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Sink: KafkaOutput (59/72)
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: KafkaOutput (53/72) switched to CANCELED
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Sink: KafkaOutput (53/72)
> 2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy
>   - 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152)
>   at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at java.util.HashMap.internalWriteEntries(HashMap.java:1777)
>   at java.util.HashMap.writeObject(HashMap.java:1354)
>   at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>   at 
> java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691)
>   at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>   at org.apache.flink.util.SerializedValue.(SerializedValue.java:48)
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78)
>   at 
> 

[GitHub] flink pull request #2206: [FLINK-4157] Catch Kafka metrics serialization exc...

2016-07-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2206


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4157) FlinkKafkaMetrics cause TaskManager shutdown during cancellation

2016-07-11 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4157.
---
Resolution: Fixed

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/36aad48e

> FlinkKafkaMetrics cause TaskManager shutdown during cancellation
> 
>
> Key: FLINK-4157
> URL: https://issues.apache.org/jira/browse/FLINK-4157
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
> Fix For: 1.1.0
>
>
> The following issue was reported by a user:
> {code}
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Sink: KafkaOutput (59/72)
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Sink: KafkaOutput (53/72) switched to CANCELED
> 2016-07-05 01:32:25,113 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Sink: KafkaOutput (53/72)
> 2016-07-05 01:32:25,144 ERROR akka.actor.OneForOneStrategy
>   - 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:106)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:211)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:383)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:57)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator.writeObject(DefaultKafkaMetricAccumulator.java:152)
>   at sun.reflect.GeneratedMethodAccessor20859.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at java.util.HashMap.internalWriteEntries(HashMap.java:1777)
>   at java.util.HashMap.writeObject(HashMap.java:1354)
>   at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
>   at 
> java.util.Collections$SynchronizedMap.writeObject(Collections.java:2691)
>   at sun.reflect.GeneratedMethodAccessor226.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
>   at org.apache.flink.util.SerializedValue.(SerializedValue.java:48)
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58)
>   at 
> org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:78)
>   at 
> 

  1   2   >