[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542617#comment-16542617
 ] 

ASF GitHub Bot commented on FLINK-7095:
---

Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/5375


> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6, pull-request-available
> Fix For: 1.6.0
>
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542615#comment-16542615
 ] 

ASF GitHub Bot commented on FLINK-7095:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5375
  
Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink 
jira will be addressed by me in the future.


> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6, pull-request-available
> Fix For: 1.6.0
>
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5375: [FLINK-7095] [TaskManager] Add Command line parsin...

2018-07-12 Thread zhangminglei
Github user zhangminglei closed the pull request at:

https://github.com/apache/flink/pull/5375


---


[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...

2018-07-12 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6323

[FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format 
interfaces

## What is the purpose of the change

This PR finalizes the efforts done in #6264 and #6201 for having unified 
interfaces for table sources, table sinks, and table formats. It reduces code 
duplication and cleans up the code base around factories.


## Brief change log

- Introduction of `org.apache.table.factories.TableFactory` a common 
interface for factories
- Introduction of `org.apache.table.factories.TableFormatFactory` a 
specific table factory for formats
- Specific factories for `StreamTableSource`, `StreamTableSink`, 
`BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and 
`SerializationSchema`
- Deprecation of old format-specific table sources (sinks will be 
deprecated in a follow-up PR)
- Possibility to register table source and sink under a common name (table 
type `both` in SQL Client YAML)


## Verifying this change

- Existing tests verify the implementation
- Additional ITCases and unit tests have been added
- (An end-to-end test will follow in a separate PR)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6323


commit 980499f887d72ddf9a405c4ad200d0cab15d889c
Author: Timo Walther 
Date:   2018-06-27T11:16:49Z

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbirary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.

commit 42a8a156d4e6f8f3d119c458350b6c897306fc48
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

[FLINK-8866] [table] Create unified interfaces to configure and instatiate 
TableSinks

This closes #6201.

commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9
Author: Timo Walther 
Date:   2018-07-11T11:29:03Z

Rename to TableFactory and move it to factories package

commit 1c581cba61ba321bb6de6a4d298a881840d11cfe
Author: Timo Walther 
Date:   2018-07-11T11:46:31Z

Refactor format factories

commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295
Author: Timo Walther 
Date:   2018-07-12T06:35:00Z

Unify table factories

commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916
Author: Timo Walther 
Date:   2018-07-12T07:05:50Z

Move table type out of descriptors

commit 6b83f2e1c0e63147f049dc5389c5633077b789a4
Author: Timo Walther 
Date:   2018-07-12T08:50:09Z

Make source/sink factories environment-dependent

commit 4f1255fd003080f078afe6ef67ffa58f40ffec36
Author: Timo Walther 
Date:   2018-07-12T18:48:45Z

Clean up and simplify changes




---


[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...

2018-07-12 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5375
  
Hi, @tillrohrmann . You are welcome ~ I still have a lot of other flink 
jira will be addressed by me in the future.


---


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542614#comment-16542614
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6323

[FLINK-8558] [FLINK-8866] [table] Finalize unified table source/sink/format 
interfaces

## What is the purpose of the change

This PR finalizes the efforts done in #6264 and #6201 for having unified 
interfaces for table sources, table sinks, and table formats. It reduces code 
duplication and cleans up the code base around factories.


## Brief change log

- Introduction of `org.apache.table.factories.TableFactory` a common 
interface for factories
- Introduction of `org.apache.table.factories.TableFormatFactory` a 
specific table factory for formats
- Specific factories for `StreamTableSource`, `StreamTableSink`, 
`BatchTableSource`, `BatchTableSink`, `DeserializationSchema`, and 
`SerializationSchema`
- Deprecation of old format-specific table sources (sinks will be 
deprecated in a follow-up PR)
- Possibility to register table source and sink under a common name (table 
type `both` in SQL Client YAML)


## Verifying this change

- Existing tests verify the implementation
- Additional ITCases and unit tests have been added
- (An end-to-end test will follow in a separate PR)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink UnifiedInterfacesFinal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6323


commit 980499f887d72ddf9a405c4ad200d0cab15d889c
Author: Timo Walther 
Date:   2018-06-27T11:16:49Z

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbirary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.

commit 42a8a156d4e6f8f3d119c458350b6c897306fc48
Author: Shuyi Chen 
Date:   2018-06-19T19:00:34Z

[FLINK-8866] [table] Create unified interfaces to configure and instatiate 
TableSinks

This closes #6201.

commit 311dc62e59c0e4146c094b73c21b979f31b2e1d9
Author: Timo Walther 
Date:   2018-07-11T11:29:03Z

Rename to TableFactory and move it to factories package

commit 1c581cba61ba321bb6de6a4d298a881840d11cfe
Author: Timo Walther 
Date:   2018-07-11T11:46:31Z

Refactor format factories

commit 5c6df7598d1f1c3c698ae9b6b35eb37d7fff8295
Author: Timo Walther 
Date:   2018-07-12T06:35:00Z

Unify table factories

commit 0cd7c44c006aba21c32d8785d17bfc3dbee03916
Author: Timo Walther 
Date:   2018-07-12T07:05:50Z

Move table type out of descriptors

commit 6b83f2e1c0e63147f049dc5389c5633077b789a4
Author: Timo Walther 
Date:   2018-07-12T08:50:09Z

Make source/sink factories environment-dependent

commit 4f1255fd003080f078afe6ef67ffa58f40ffec36
Author: Timo Walther 
Date:   2018-07-12T18:48:45Z

Clean up and simplify changes




> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Majo

[jira] [Resolved] (FLINK-4807) ResourceManager clean up JobManager's registration

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4807.
--
   Resolution: Duplicate
Fix Version/s: 1.5.0

> ResourceManager clean up JobManager's registration
> --
>
> Key: FLINK-4807
> URL: https://issues.apache.org/jira/browse/FLINK-4807
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Priority: Major
> Fix For: 1.5.0
>
>
> When RM received a JM's registration, it will record it either with some 
> leaderid or leadership listener. We should make sure the finished / failed JM 
> can properly unregister itself with RM.
> We can make it happen by doing these two things:
> 1. If JM finds out job reaches a terminate state(either success or fail), it 
> should send an unregistration request to RM.
> 2. If (1) does not happen for various reasons, RM can rely on the heartbeat 
> manager to find out timeout JM and clear it up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7472) Release task managers gracefully

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7472.
--
Resolution: Done

> Release task managers gracefully
> 
>
> Key: FLINK-7472
> URL: https://issues.apache.org/jira/browse/FLINK-7472
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Priority: Major
>
> When a task manager is no longer needed (e.g. due to idle timeout in slot 
> manager), the RM should gracefully stop it without spurious warnings.   This 
> implies some actions should be taken before the TM is actually killed.   
> Proactive steps include stopping the heartbeat monitor and sending a 
> disconnect message.   
> It is unclear whether `RM::closeTaskManagerConnection` method should be 
> called proactively (when we plan to kill a TM), reactively (after the TM is 
> killed), or both.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9004) Cluster test: Run general purpose job with failures with Yarn session

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542611#comment-16542611
 ] 

ASF GitHub Bot commented on FLINK-9004:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
I extended the regular `README.md`.


> Cluster test: Run general purpose job with failures with Yarn session
> -
>
> Key: FLINK-9004
> URL: https://issues.apache.org/jira/browse/FLINK-9004
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Similar to FLINK-8973, we should run the general purpose job (FLINK-8971) on 
> a Yarn session cluster and simulate failures.
> The job jar should be ill-packaged, meaning that we include too many 
> dependencies in the user jar. We should include the Scala library, Hadoop and 
> Flink itself to verify that there are no class loading issues.
> The general purpose job should run with misbehavior activated. Additionally, 
> we should simulate at least the following failure scenarios:
> * Kill Flink processes
> * Kill connection to storage system for checkpoints and jobs
> * Simulate network partition
> We should run the test at least with the following state backend: RocksDB 
> incremental async and checkpointing to S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6240: [FLINK-9004][tests] Implement Jepsen tests to test job av...

2018-07-12 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/6240
  
I extended the regular `README.md`.


---


[jira] [Assigned] (FLINK-7075) Implement Flip-6 standalone mode

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7075:


Assignee: Till Rohrmann

> Implement Flip-6 standalone mode
> 
>
> Key: FLINK-7075
> URL: https://issues.apache.org/jira/browse/FLINK-7075
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> This is an umbrella issue to sum up what's needed to implement Flink's 
> standalone mode with the new Flip-6 architecture



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7075) Implement Flip-6 standalone mode

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7075.

   Resolution: Fixed
Fix Version/s: 1.5.0

Added to Flink 1.5.0

> Implement Flip-6 standalone mode
> 
>
> Key: FLINK-7075
> URL: https://issues.apache.org/jira/browse/FLINK-7075
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> This is an umbrella issue to sum up what's needed to implement Flink's 
> standalone mode with the new Flip-6 architecture



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-4897) Implement Dispatcher to support Flink sessions

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4897.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Added to Flink 1.5.0: 748eba1b5363aeb9e64d379b9b25d5d997bec7ad

> Implement Dispatcher to support Flink sessions
> --
>
> Key: FLINK-4897
> URL: https://issues.apache.org/jira/browse/FLINK-4897
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
> Environment: FLIP-6 feature branch
>Reporter: Eron Wright 
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> This task is to implement the dispatcher component which reacts to calls from 
> the cluster's REST endpoint.
> The dispatcher is responsible for job submission, job listing, job leader 
> lookups, restarting jobs in case of a recovery and the cluster's component 
> lifecycle management. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-4834) Implement unified High Availability Services Abstraction

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4834.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

All subtasks have been completed in Flink 1.5.0.

> Implement unified High Availability Services Abstraction
> 
>
> Key: FLINK-4834
> URL: https://issues.apache.org/jira/browse/FLINK-4834
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4343) Implement new TaskManager

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4343:


Assignee: Till Rohrmann

> Implement new TaskManager
> -
>
> Key: FLINK-4343
> URL: https://issues.apache.org/jira/browse/FLINK-4343
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> This is the parent issue for the efforts to implement the {{TaskManager}} 
> changes based on FLIP-6 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)
> Because of the breadth of changes, we should implement a new version of the 
> {{TaskManager}} (let's call it {{TaskExecutor}}) rather than updating the 
> current {{TaskManager}}. That will allow us to keep a working master branch.
> At the point when the new cluster management is on par with the current 
> implementation, we will drop the old {{TaskManager}} and rename the 
> {{TaskExecutor}} to {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-4343) Implement new TaskManager

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-4343.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

The {{TaskExecutor}} has been added to Flink 1.5.0.

> Implement new TaskManager
> -
>
> Key: FLINK-4343
> URL: https://issues.apache.org/jira/browse/FLINK-4343
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> This is the parent issue for the efforts to implement the {{TaskManager}} 
> changes based on FLIP-6 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)
> Because of the breadth of changes, we should implement a new version of the 
> {{TaskManager}} (let's call it {{TaskExecutor}}) rather than updating the 
> current {{TaskManager}}. That will allow us to keep a working master branch.
> At the point when the new cluster management is on par with the current 
> implementation, we will drop the old {{TaskManager}} and rename the 
> {{TaskExecutor}} to {{TaskManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7469) Handle slot requests occuring before RM registration completes

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7469.
--
   Resolution: Duplicate
Fix Version/s: 1.5.0

This issue should have been fixed with FLINK-9427.

> Handle slot requests occuring before RM registration completes
> --
>
> Key: FLINK-7469
> URL: https://issues.apache.org/jira/browse/FLINK-7469
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Priority: Minor
> Fix For: 1.5.0
>
> Attachments: jm.log, taskmanager-3.log
>
>
> *Description*
> Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
> registration for 10 seconds.  Meanwhile the registration may actually have 
> succeeded in the RM.   Slot requests may then arrive at the TM while RM 
> registration is incomplete.   
> The current behavior appears to be that the TM honors the slot request.   
> Please determine whether this is a feature or a bug.   If a feature, maybe a 
> slot request should implicitly complete the registration.
> *Example*
> See attached a log showing a certain TM exhibiting the described behavior.
>  The RM launched 12 TMs in parallel, evidently causing the RM to sluggishly 
> respond to a couple of the TM registration requests.   From the logs we see 
> that '00012' and '3' experienced a registration timeout but accepted a 
> slot request anyway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7095.

   Resolution: Duplicate
Fix Version/s: 1.6.0

> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6, pull-request-available
> Fix For: 1.6.0
>
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7095:
--
Labels: flip-6 pull-request-available  (was: flip-6)

> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6, pull-request-available
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542601#comment-16542601
 ] 

ASF GitHub Bot commented on FLINK-7095:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5375
  
Sorry for not getting back to you earlier @zhangminglei. I accidentally 
addressed this issue with #6318. I think we can therefore close this PR.

Sorry for the bad PR management. This won't happen again.


> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6, pull-request-available
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5375
  
Sorry for not getting back to you earlier @zhangminglei. I accidentally 
addressed this issue with #6318. I think we can therefore close this PR.

Sorry for the bad PR management. This won't happen again.


---


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542600#comment-16542600
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202254410
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
+
+If you want to build the container for a specific version of 
flink/hadoop/scala
+you can configure it in the respective args:
+
+docker build --build-arg FLINK_VERSION=1.0.3 --build-arg 
HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t 
"flink:1.0.3-hadoop2.6-scala_2.10" flink
--- End diff --

Good point, will update to the latest Flink version.


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202254368
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
--- End diff --

It applies only to the job cluster mode. Will update the title.


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202254410
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
+
+If you want to build the container for a specific version of 
flink/hadoop/scala
+you can configure it in the respective args:
+
+docker build --build-arg FLINK_VERSION=1.0.3 --build-arg 
HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t 
"flink:1.0.3-hadoop2.6-scala_2.10" flink
--- End diff --

Good point, will update to the latest Flink version.


---


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542599#comment-16542599
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202254368
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
--- End diff --

It applies only to the job cluster mode. Will update the title.


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2018-07-12 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542596#comment-16542596
 ] 

Till Rohrmann commented on FLINK-4319:
--

[~yazdanjs] thanks for your interest. The community implemented a good part of 
the whole Flip-6 effort. I think we can actually close the linked issues here.

However, there are still plenty of open issues and potential improvements which 
one could apply to Flink's distributed architecture. Most of these issues are 
assigned to the `Distributed Coordination` component. Please look for issues 
assigned to this component.

I will update the old Flip-6 JIRA issues to reflect the current state.

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-07-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542594#comment-16542594
 ] 

Chesnay Schepler commented on FLINK-5860:
-

[~maheshsenni] the cassandra example should not be touched as it is not a 
actually test.

The {{TestingTaskManagerRuntimeInfo}} constructor should be removed.
The {{RecordOrEventCollectingResultPartitionWriter}} and 
{{RecordCollectingResultPartitionWriter}} constructors should be modified to 
accept a temp directory.

Note that it is perfectly fine to address this JIRA in multiple PRs. This will 
make things easier to review and reduces the risk of your changes becoming 
out-dated.

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Mahesh Senniappan
>Priority: Major
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static

[jira] [Commented] (FLINK-9771) "Show Plan" option under Submit New Job in WebUI not working

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542591#comment-16542591
 ] 

ASF GitHub Bot commented on FLINK-9771:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6274


>  "Show Plan" option under Submit New Job in WebUI not working 
> --
>
> Key: FLINK-9771
> URL: https://issues.apache.org/jira/browse/FLINK-9771
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, Webfrontend
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Yazdan Shirvany
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> {{Show Plan}} button under {{Submit new job}} in WebUI not working.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542588#comment-16542588
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6170


> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542592#comment-16542592
 ] 

ASF GitHub Bot commented on FLINK-9091:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6102


> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9801) flink-dist is missing dependency on flink-examples

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542589#comment-16542589
 ] 

ASF GitHub Bot commented on FLINK-9801:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6304


> flink-dist is missing dependency on flink-examples
> --
>
> Key: FLINK-9801
> URL: https://issues.apache.org/jira/browse/FLINK-9801
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Examples
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> For the assembly of {{flink-dist}} we copy various batch/streaming examples 
> directly from the respective /target directory.
> Never mind that this is already a problem as is (see FLINK-9582), 
> {{flink-dist}} defines no dependency on these modules.
> If you were to only compile {{flink-dist}} with the {{-am}} flag (to also 
> build all dependencies) it thus _may_ or _may not_ happen that these modules 
> are actually compiled, which could cause these examples to not be included in 
> the final assembly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9810) JarListHandler does not close opened jars

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542590#comment-16542590
 ] 

ASF GitHub Bot commented on FLINK-9810:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6310


> JarListHandler does not close opened jars
> -
>
> Key: FLINK-9810
> URL: https://issues.apache.org/jira/browse/FLINK-9810
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.2, 1.6.0
>
>
> {code}
> try {
>   JarFile jar = new JarFile(f);
>   Manifest manifest = jar.getManifest();
>   String assemblerClass = null;
>   if (manifest != null) {
>   assemblerClass = 
> manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
>   if (assemblerClass == null) {
>   assemblerClass = 
> manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
>   }
>   }
>   if (assemblerClass != null) {
>   classes = assemblerClass.split(",");
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6170: [FLINK-9563]: Using a custom sink function for tes...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6170


---


[GitHub] flink pull request #6304: [FLINK-9801][build] Add missing example dependenci...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6304


---


[GitHub] flink pull request #6102: [FLINK-9091][build] Dependency convergence run aga...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6102


---


[GitHub] flink pull request #6274: [FLINK-9771][rest] Fix plan JSON response

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6274


---


[GitHub] flink pull request #6310: [FLINK-9810][rest] Close jar file in JarListHandle...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6310


---


[jira] [Closed] (FLINK-9563) Migrate integration tests for CEP

2018-07-12 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9563.
---
   Resolution: Fixed
Fix Version/s: 1.6.0

master: 40f9131e9136f4f956c59e4c0c837afba8b9bb4d

> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542577#comment-16542577
 ] 

ASF GitHub Bot commented on FLINK-9276:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5954


> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-07-12 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5954


---


[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542576#comment-16542576
 ] 

ASF GitHub Bot commented on FLINK-9276:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
OK, closing this PR...


> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-07-12 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
OK, closing this PR...


---


[GitHub] flink pull request #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckp...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r202249510
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -99,11 +110,21 @@
 
private AbstractStateBackend stateBackend;
 
+   @Parameterized.Parameter
+   public StateBackendEnum stateBackendEnum;
+
enum StateBackendEnum {
MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, 
ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
}
 
-   protected abstract StateBackendEnum getStateBackend();
+   @Parameterized.Parameters(name = "statebackend type ={0}")
+   public static Collection parameter() {
+   return Arrays.asList(MEM, FILE, ROCKSDB_FULLY_ASYNC, 
ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC);
--- End diff --

Here we could say `Arrays.asList(StateBackendEnum.values())`.


---


[jira] [Commented] (FLINK-9807) Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542572#comment-16542572
 ] 

ASF GitHub Bot commented on FLINK-9807:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6305#discussion_r202249510
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -99,11 +110,21 @@
 
private AbstractStateBackend stateBackend;
 
+   @Parameterized.Parameter
+   public StateBackendEnum stateBackendEnum;
+
enum StateBackendEnum {
MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, 
ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
}
 
-   protected abstract StateBackendEnum getStateBackend();
+   @Parameterized.Parameters(name = "statebackend type ={0}")
+   public static Collection parameter() {
+   return Arrays.asList(MEM, FILE, ROCKSDB_FULLY_ASYNC, 
ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC);
--- End diff --

Here we could say `Arrays.asList(StateBackendEnum.values())`.


> Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized
> --
>
> Key: FLINK-9807
> URL: https://issues.apache.org/jira/browse/FLINK-9807
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> Now, the `AbastractEventTimeWIndowCheckpointITCase` and 
> `AbstractLocalRecoveryITCase` need to re-implement for every backend, we can 
> improve this by using JUnit parameterized



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9764:
-
Priority: Critical  (was: Major)

> Failure in LocalRecoveryRocksDBFullITCase
> -
>
> Key: FLINK-9764
> URL: https://issues.apache.org/jira/browse/FLINK-9764
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> {code}
> Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase
> Starting null#executeTest.
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but 
> was:<1209>
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> 
> but was:<1209>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInput

[jira] [Updated] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9764:
-
Fix Version/s: 1.6.0

> Failure in LocalRecoveryRocksDBFullITCase
> -
>
> Key: FLINK-9764
> URL: https://issues.apache.org/jira/browse/FLINK-9764
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> {code}
> Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase
> Starting null#executeTest.
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but 
> was:<1209>
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> 
> but was:<1209>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r

[jira] [Assigned] (FLINK-9807) Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9807:


Assignee: Congxian Qiu

> Improve EventTimeWindowCheckpointITCase&LocalRecoveryITCase with parameterized
> --
>
> Key: FLINK-9807
> URL: https://issues.apache.org/jira/browse/FLINK-9807
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> Now, the `AbastractEventTimeWIndowCheckpointITCase` and 
> `AbstractLocalRecoveryITCase` need to re-implement for every backend, we can 
> improve this by using JUnit parameterized



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542549#comment-16542549
 ] 

ASF GitHub Bot commented on FLINK-9276:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5954
  
I think this is not super critical and since we only pass in `Exceptions` 
at the moment I would say let's close this PR.


> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5954
  
I think this is not super critical and since we only pass in `Exceptions` 
at the moment I would say let's close this PR.


---


[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542547#comment-16542547
 ] 

ASF GitHub Bot commented on FLINK-9575:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248523
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
--- End diff --

This call should also be called if the removal of the job from the 
`SubmittedJobGraphStore` failed because it does not remove any HA files.


> Potential race condition when removing JobGraph in HA
> -
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542546#comment-16542546
 ] 

ASF GitHub Bot commented on FLINK-9575:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248449
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+  jobManagerMetricGroup.removeJob(jobID)
--- End diff --

I think we could always execute this call independent of whether the 
removal from the `SubmittedJobGraphStore` was successful or not.


> Potential race condition when removing JobGraph in HA
> -
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542548#comment-16542548
 ] 

ASF GitHub Bot commented on FLINK-9575:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248365
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Can't we move these this line in the future where we remove the job from 
the `SubmittedJobGraphStore`?


> Potential race condition when removing JobGraph in HA
> -
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248523
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
--- End diff --

This call should also be called if the removal of the job from the 
`SubmittedJobGraphStore` failed because it does not remove any HA files.


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248449
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+  jobManagerMetricGroup.removeJob(jobID)
--- End diff --

I think we could always execute this call independent of whether the 
removal from the `SubmittedJobGraphStore` was successful or not.


---


[GitHub] flink pull request #6322: [FLINK-9575]: Remove job-related BLOBS only if the...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6322#discussion_r202248365
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1759,11 +1759,22 @@ class JobManager(
   case None => None
 }
 
-// remove all job-related BLOBs from local and HA store
-libraryCacheManager.unregisterJob(jobID)
-blobServer.cleanupJob(jobID, removeJobFromStateBackend)
+// remove all job-related BLOBs from local and HA store, only if the 
job was removed correctly
+futureOption match {
+  case Some(future) => future.onComplete{
+case scala.util.Success(_) => {
+  libraryCacheManager.unregisterJob(jobID)
+  blobServer.cleanupJob(jobID, removeJobFromStateBackend)
--- End diff --

Can't we move these this line in the future where we remove the job from 
the `SubmittedJobGraphStore`?


---


[jira] [Commented] (FLINK-9832) Allow commas in job submission query params

2018-07-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542526#comment-16542526
 ] 

Chesnay Schepler commented on FLINK-9832:
-

HA, it's not a parsing error but the _handler_ fails the request, so we're good.

> Allow commas in job submission query params
> ---
>
> Key: FLINK-9832
> URL: https://issues.apache.org/jira/browse/FLINK-9832
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.1
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> As reported on the user mailing list in the thread "Run programs w/ params 
> including comma via REST api" [1], submitting a job with mainArgs that 
> include a comma results in an exception.
> To reproduce submit a job with the following mainArgs:
> {code}
> --servers 10.100.98.9:9092,10.100.98.237:9092
> {code}
> The request fails with
> {code}
> Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092].
> {code}
> As a work around, users have to use a different delimiter such as {{;}}.
> The proper fix of this API would make these params part of the {{POST}} 
> request instead of relying on query params (as noted in FLINK-9499). I think 
> it's still valuable to fix this as part of a bug fix release for 1.5.
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9832) Allow commas in job submission query params

2018-07-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542522#comment-16542522
 ] 

Chesnay Schepler commented on FLINK-9832:
-

hold on, that doesn't quite work since a parameter parsing error fails the 
request...

> Allow commas in job submission query params
> ---
>
> Key: FLINK-9832
> URL: https://issues.apache.org/jira/browse/FLINK-9832
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.1
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> As reported on the user mailing list in the thread "Run programs w/ params 
> including comma via REST api" [1], submitting a job with mainArgs that 
> include a comma results in an exception.
> To reproduce submit a job with the following mainArgs:
> {code}
> --servers 10.100.98.9:9092,10.100.98.237:9092
> {code}
> The request fails with
> {code}
> Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092].
> {code}
> As a work around, users have to use a different delimiter such as {{;}}.
> The proper fix of this API would make these params part of the {{POST}} 
> request instead of relying on query params (as noted in FLINK-9499). I think 
> it's still valuable to fix this as part of a bug fix release for 1.5.
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9832) Allow commas in job submission query params

2018-07-12 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542520#comment-16542520
 ] 

Chesnay Schepler commented on FLINK-9832:
-

I have an idea to fix this in a better way for now. The core issue with 
switching to POST is that legacy handler to not have access to the request body.
We can avoid this problem by both sending parameters in the POST body, and as 
query parameters.

The legacy handlers will continue to use the query parameters and won't even be 
aware that anything has changed, the new handlers will access the (optional!) 
post body, and prioritize that over query parameters.

> Allow commas in job submission query params
> ---
>
> Key: FLINK-9832
> URL: https://issues.apache.org/jira/browse/FLINK-9832
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.1
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> As reported on the user mailing list in the thread "Run programs w/ params 
> including comma via REST api" [1], submitting a job with mainArgs that 
> include a comma results in an exception.
> To reproduce submit a job with the following mainArgs:
> {code}
> --servers 10.100.98.9:9092,10.100.98.237:9092
> {code}
> The request fails with
> {code}
> Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092].
> {code}
> As a work around, users have to use a different delimiter such as {{;}}.
> The proper fix of this API would make these params part of the {{POST}} 
> request instead of relying on query params (as noted in FLINK-9499). I think 
> it's still valuable to fix this as part of a bug fix release for 1.5.
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9832) Allow commas in job submission query params

2018-07-12 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-9832:
---

Assignee: Chesnay Schepler

> Allow commas in job submission query params
> ---
>
> Key: FLINK-9832
> URL: https://issues.apache.org/jira/browse/FLINK-9832
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.1
>Reporter: Ufuk Celebi
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> As reported on the user mailing list in the thread "Run programs w/ params 
> including comma via REST api" [1], submitting a job with mainArgs that 
> include a comma results in an exception.
> To reproduce submit a job with the following mainArgs:
> {code}
> --servers 10.100.98.9:9092,10.100.98.237:9092
> {code}
> The request fails with
> {code}
> Expected only one value [--servers 10.100.98.9:9092, 10.100.98.237:9092].
> {code}
> As a work around, users have to use a different delimiter such as {{;}}.
> The proper fix of this API would make these params part of the {{POST}} 
> request instead of relying on query params (as noted in FLINK-9499). I think 
> it's still valuable to fix this as part of a bug fix release for 1.5.
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Run-programs-w-params-including-comma-via-REST-api-td19662.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9413) Tasks can fail with PartitionNotFoundException if consumer deployment takes too long

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542478#comment-16542478
 ] 

ASF GitHub Bot commented on FLINK-9413:
---

Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/6103
  
@tillrohrmann  already did that, it looks alleviate though not fix. I'm 
upgrade from 1.2.0 to 1.4.2. Major thing i can see is TM now connection to HDFS 
instead of only talk to JobManager,  could this increase the possibility of 
this issue?


> Tasks can fail with PartitionNotFoundException if consumer deployment takes 
> too long
> 
>
> Key: FLINK-9413
> URL: https://issues.apache.org/jira/browse/FLINK-9413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: zhangminglei
>Priority: Critical
>  Labels: pull-request-available
>
> {{Tasks}} can fail with a {{PartitionNotFoundException}} if the deployment of 
> the producer takes too long. More specifically, if it takes longer than the 
> {{taskmanager.network.request-backoff.max}}, then the {{Task}} will give up 
> and fail.
> The problem is that we calculate the {{InputGateDeploymentDescriptor}} for a 
> consuming task once the producer has been assigned a slot but we do not wait 
> until it is actually running. The problem should be fixed if we wait until 
> the task is in state {{RUNNING}} before assigning the result partition to the 
> consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

2018-07-12 Thread RalphSu
Github user RalphSu commented on the issue:

https://github.com/apache/flink/pull/6103
  
@tillrohrmann  already did that, it looks alleviate though not fix. I'm 
upgrade from 1.2.0 to 1.4.2. Major thing i can see is TM now connection to HDFS 
instead of only talk to JobManager,  could this increase the possibility of 
this issue?


---


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542428#comment-16542428
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
The idea here is that `maxNumberOfRecordsPerFetch` should never be a value 
that gets records that exceeds the read limit  (2 Mb/sec) from the math here.
```
2 Mb/sec / (averageRecordSizeBytes * # reads/sec))
``` 
Atleast that's what the intent is - Let me know if that makes sense or if 
there is something amiss about the approach here. If there is a way in which 
`maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, 
it will still be throttled by Kinesis.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
The idea here is that `maxNumberOfRecordsPerFetch` should never be a value 
that gets records that exceeds the read limit  (2 Mb/sec) from the math here.
```
2 Mb/sec / (averageRecordSizeBytes * # reads/sec))
``` 
Atleast that's what the intent is - Let me know if that makes sense or if 
there is something amiss about the approach here. If there is a way in which 
`maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, 
it will still be throttled by Kinesis.


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227845
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

Changed.


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227834
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

Changed.


---


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542423#comment-16542423
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227834
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

Changed.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542424#comment-16542424
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227845
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

Changed.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542293#comment-16542293
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202201507
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542292#comment-16542292
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202199865
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

[most Flink's feature 
flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) 
are named `xx.enabled`,  I'd suggest rename it to something like 
`flink.shard.adaptive.read.records.enabled`


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202199865
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

[most Flink's feature 
flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) 
are named `xx.enabled`,  I'd suggest rename it to something like 
`flink.shard.adaptive.read.records.enabled`


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202201507
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);


---


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542273#comment-16542273
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202198679
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
--- End diff --

Does this apply to both standalone and cluster mode? Want to get clarified 
since the PR title says it's for standaloneJobCluster


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9822) Add Dockerfile for StandaloneJobClusterEntryPoint image

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542272#comment-16542272
 ] 

ASF GitHub Bot commented on FLINK-9822:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202197686
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
+
+If you want to build the container for a specific version of 
flink/hadoop/scala
+you can configure it in the respective args:
+
+docker build --build-arg FLINK_VERSION=1.0.3 --build-arg 
HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t 
"flink:1.0.3-hadoop2.6-scala_2.10" flink
--- End diff --

Is FLINK_VERSION 1.0.3 only for demo purpose? Can we use a more recent 
version for demoing?


> Add Dockerfile for StandaloneJobClusterEntryPoint image
> ---
>
> Key: FLINK-9822
> URL: https://issues.apache.org/jira/browse/FLINK-9822
> Project: Flink
>  Issue Type: New Feature
>  Components: Docker
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Add a {{Dockerfile}} to create an image which contains the 
> {{StandaloneJobClusterEntryPoint}} and a specified user code jar. The 
> entrypoint of this image should start the {{StandaloneJobClusterEntryPoint}} 
> with the added user code jar. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202198679
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
--- End diff --

Does this apply to both standalone and cluster mode? Want to get clarified 
since the PR title says it's for standaloneJobCluster


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202197686
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
+
+If you want to build the container for a specific version of 
flink/hadoop/scala
+you can configure it in the respective args:
+
+docker build --build-arg FLINK_VERSION=1.0.3 --build-arg 
HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t 
"flink:1.0.3-hadoop2.6-scala_2.10" flink
--- End diff --

Is FLINK_VERSION 1.0.3 only for demo purpose? Can we use a more recent 
version for demoing?


---


[jira] [Assigned] (FLINK-9703) Mesos does not expose TM Prometheus port

2018-07-12 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9703:


Assignee: Rune Skou Larsen

> Mesos does not expose TM Prometheus port
> 
>
> Key: FLINK-9703
> URL: https://issues.apache.org/jira/browse/FLINK-9703
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Reporter: Rune Skou Larsen
>Assignee: Rune Skou Larsen
>Priority: Major
>  Labels: pull-request-available
>
> LaunchableMesosWorker makes Mesos expose these ports for a Task Manager:
> {{private static final String[] TM_PORT_KEYS = {}}
> {{ "taskmanager.rpc.port",}}
> {{ "taskmanager.data.port"};}}
> But when running Prometheus Exporter on a TM, another port needs to be 
> exposed to make Flink's Prometheos endpoint externally scrapable by the 
> Prometheus server. By default this is port 9249, but it is configurable 
> according to:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter]
>  
> My plan is to make a PR, that just adds another config option for mesos, to 
> enable custom ports to be exposed in the provisioned TMs.
> I considered carrying parts of the Metrics config into the Mesos code to 
> automatically map metrics ports in mesos. But making such a "shortcut" 
> between Flink's metrics and mesos modules would probably need some sort of 
> integration testing, so I prefer the simple solution of just adding another 
> Mesos config option. But comments are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9703) Mesos does not expose TM Prometheus port

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542188#comment-16542188
 ] 

ASF GitHub Bot commented on FLINK-9703:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6288#discussion_r202171601
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -332,6 +334,22 @@ public String toString() {
return taskInfo.build();
}
 
+   /**
+* Get port keys representing the TM's configured endpoints. This 
includes mandatory TM endpoints such as
+* data and rpc as well as optionally configured endpoints for services 
such as prometheus reporter
+*
+* @return A deterministicly ordered Set of port keys to expose from 
the TM container
+*/
+   private Set getPortKeys() {
+   LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+   containerSpec.getDynamicConfiguration().keySet().stream()
+   .filter(key -> key.endsWith(".port") || 
key.endsWith(".ports"))  // This matches property naming convention
--- End diff --

I agree that simply taking all configuration values which end with `port` 
and `ports` is problematic. What about configuration values whose value is 
needed, e.g. remote ports, and should not be overwritten. For example the 
`jobmanager.rpc.port` is one of these configuration values.

I would rather prefer a mesos specific configuration value which we can use 
to define whose ports need to be dynamically assigned. For example 
`mesos.dynamic-port-assignment: "metrics.prom.port, taskmanager.rpc.port, 
taskmanager.data.port"`. What do you think?


> Mesos does not expose TM Prometheus port
> 
>
> Key: FLINK-9703
> URL: https://issues.apache.org/jira/browse/FLINK-9703
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Reporter: Rune Skou Larsen
>Priority: Major
>  Labels: pull-request-available
>
> LaunchableMesosWorker makes Mesos expose these ports for a Task Manager:
> {{private static final String[] TM_PORT_KEYS = {}}
> {{ "taskmanager.rpc.port",}}
> {{ "taskmanager.data.port"};}}
> But when running Prometheus Exporter on a TM, another port needs to be 
> exposed to make Flink's Prometheos endpoint externally scrapable by the 
> Prometheus server. By default this is port 9249, but it is configurable 
> according to:
> [https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter]
>  
> My plan is to make a PR, that just adds another config option for mesos, to 
> enable custom ports to be exposed in the provisioned TMs.
> I considered carrying parts of the Metrics config into the Mesos code to 
> automatically map metrics ports in mesos. But making such a "shortcut" 
> between Flink's metrics and mesos modules would probably need some sort of 
> integration testing, so I prefer the simple solution of just adding another 
> Mesos config option. But comments are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6288: [FLINK-9703] [flink-mesos] Expose Prometheus port ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6288#discussion_r202171601
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -332,6 +334,22 @@ public String toString() {
return taskInfo.build();
}
 
+   /**
+* Get port keys representing the TM's configured endpoints. This 
includes mandatory TM endpoints such as
+* data and rpc as well as optionally configured endpoints for services 
such as prometheus reporter
+*
+* @return A deterministicly ordered Set of port keys to expose from 
the TM container
+*/
+   private Set getPortKeys() {
+   LinkedHashSet tmPortKeys = new 
LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+   containerSpec.getDynamicConfiguration().keySet().stream()
+   .filter(key -> key.endsWith(".port") || 
key.endsWith(".ports"))  // This matches property naming convention
--- End diff --

I agree that simply taking all configuration values which end with `port` 
and `ports` is problematic. What about configuration values whose value is 
needed, e.g. remote ports, and should not be overwritten. For example the 
`jobmanager.rpc.port` is one of these configuration values.

I would rather prefer a mesos specific configuration value which we can use 
to define whose ports need to be dynamically assigned. For example 
`mesos.dynamic-port-assignment: "metrics.prom.port, taskmanager.rpc.port, 
taskmanager.data.port"`. What do you think?


---


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542132#comment-16542132
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202156901
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Makes sense. Done.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202156901
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Makes sense. Done.


---


[jira] [Closed] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-07-12 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8480.
-
Resolution: Fixed

Merged on master with 42ada8ad9ca28f94d0a0355658330198bbc2b577.

> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-07-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8480:
--
Labels: pull-request-available  (was: )

> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8480) Implement Java API to expose join functionality of TimeBoundedStreamJoinOperator

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542120#comment-16542120
 ] 

ASF GitHub Bot commented on FLINK-8480:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5482


> Implement Java API to expose join functionality of 
> TimeBoundedStreamJoinOperator
> 
>
> Key: FLINK-8480
> URL: https://issues.apache.org/jira/browse/FLINK-8480
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for Interval...

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5482


---


[jira] [Closed] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-9701.
-
Resolution: Implemented

Merged in:
master: f45b7f7ff2

> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542045#comment-16542045
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6313


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6313


---


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542040#comment-16542040
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6313
  
LGTM, nice work! 👍 Besides one comment about closing the backends after 
tests, the PR is ready. This is no big thing so I will just fix it myself 
before merging now.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6313
  
LGTM, nice work! 👍 Besides one comment about closing the backends after 
tests, the PR is ready. This is no big thing so I will just fix it myself 
before merging now.


---


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542039#comment-16542039
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202130806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+   private final StateBackend stateBackend;
+   private final CheckpointStorageLocation checkpointStorageLocation;
+   private final TtlTimeProvider timeProvider;
+
+   private AbstractKeyedStateBackend keyedStateBackend;
+
+   protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+   this.timeProvider = Preconditions.checkNotNull(timeProvider);
+   this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+   this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+   }
+
+   protected abstract StateBackend createStateBackend();
+
+   private CheckpointStorageLocation createCheckpointStorageLocation() {
+   try {
+   return stateBackend
+   .createCheckpointStorage(new JobID())
+   .initializeLocationForCheckpoint(2L);
+   } catch (IOException e) {
+   throw new RuntimeException("unexpected");
+   }
+   }
+
+   void createAndRestoreKeyedStateBackend() {
+   Environment env = new DummyEnvironment();
+   try {
+   if (keyedStateBackend != null) {
+   keyedStateBackend.dispose();
--- End diff --

There is a problem that the backend is only disposed here and not after 
each test, this leads to some native errors when I run the test. I suggest to 
give this context a `dispose` method and call it in a `@After` method.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202130806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+   private final StateBackend stateBackend;
+   private final CheckpointStorageLocation checkpointStorageLocation;
+   private final TtlTimeProvider timeProvider;
+
+   private AbstractKeyedStateBackend keyedStateBackend;
+
+   protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+   this.timeProvider = Preconditions.checkNotNull(timeProvider);
+   this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+   this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+   }
+
+   protected abstract StateBackend createStateBackend();
+
+   private CheckpointStorageLocation createCheckpointStorageLocation() {
+   try {
+   return stateBackend
+   .createCheckpointStorage(new JobID())
+   .initializeLocationForCheckpoint(2L);
+   } catch (IOException e) {
+   throw new RuntimeException("unexpected");
+   }
+   }
+
+   void createAndRestoreKeyedStateBackend() {
+   Environment env = new DummyEnvironment();
+   try {
+   if (keyedStateBackend != null) {
+   keyedStateBackend.dispose();
--- End diff --

There is a problem that the backend is only disposed here and not after 
each test, this leads to some native errors when I run the test. I suggest to 
give this context a `dispose` method and call it in a `@After` method.


---


[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542013#comment-16542013
 ] 

ASF GitHub Bot commented on FLINK-9829:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6321
  
@zentol, hi, there are two ways to contribute to flink project.
one is create a `jira FLINK-` , another is `[hotfix] XXX`.

for example, fix some checkstyle or code refactor, how to choose the best 
way to contribute ?




> The wrapper classes be compared by symbol of '==' directly in 
> BigDecSerializer.java
> ---
>
> Key: FLINK-9829
> URL: https://issues.apache.org/jira/browse/FLINK-9829
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> The wrapper classes should be compared by equals method rather than by symbol 
> of '==' directly in BigDecSerializer.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...

2018-07-12 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6321
  
@zentol, hi, there are two ways to contribute to flink project.
one is create a `jira FLINK-` , another is `[hotfix] XXX`.

for example, fix some checkstyle or code refactor, how to choose the best 
way to contribute ?




---


[jira] [Commented] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541946#comment-16541946
 ] 

ASF GitHub Bot commented on FLINK-8544:
---

Github user BillLeecn commented on the issue:

https://github.com/apache/flink/pull/5516
  
@dawidwys Of course not. The patch has been updated.


> JSONKeyValueDeserializationSchema throws NPE when message key is null
> -
>
> Key: FLINK-8544
> URL: https://issues.apache.org/jira/browse/FLINK-8544
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Bill Lee
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key 
> without validation.
>  If a message with key == null is read, flink throws an NPE.
> {code:java}
>   @Override
>   public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
> topic, int partition, long offset) throws IOException {
>   if (mapper == null) {
>   mapper = new ObjectMapper();
>   }
>   ObjectNode node = mapper.createObjectNode();
>   node.set("key", mapper.readValue(messageKey, JsonNode.class)); 
> // messageKey is not validate against null.
>   node.set("value", mapper.readValue(message, JsonNode.class));
> {code}
> The fix is very straightforward.
> {code:java}
>   if (messageKey == null) {
>   node.set("key", null)
>   } else {
>   node.set("key", mapper.readValue(messageKey, 
> JsonNode.class));
>   }
> {code}
> If it is appreciated, I would send a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5516: [FLINK-8544] [Kafka Connector] Handle null message key in...

2018-07-12 Thread BillLeecn
Github user BillLeecn commented on the issue:

https://github.com/apache/flink/pull/5516
  
@dawidwys Of course not. The patch has been updated.


---


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541926#comment-16541926
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202103545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

You're right. I'm just wondering whether you ever want to enable 
checkpointing without a restart strategy. So to speak if you set 
`FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as 
the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or 
`NoRestartStrategy`?

On the other hand you might want to disable restarting for all jobs running 
on your cluster by setting the restart strategy to `NoRestartStrategy`.

Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` 
to `FallbackRestartStrategy` and introduce a new default server side restart 
strategy `NoOrFixedIfCheckpointingEnabled` which resolved to 
`FixedRestartStrategy` if checkpointing is enabled and if not it resolves to 
`NoRestartStrategy`.

What do you think?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6283: [FLINK-9143] Use cluster strategy if none was set ...

2018-07-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202103545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

You're right. I'm just wondering whether you ever want to enable 
checkpointing without a restart strategy. So to speak if you set 
`FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as 
the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or 
`NoRestartStrategy`?

On the other hand you might want to disable restarting for all jobs running 
on your cluster by setting the restart strategy to `NoRestartStrategy`.

Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` 
to `FallbackRestartStrategy` and introduce a new default server side restart 
strategy `NoOrFixedIfCheckpointingEnabled` which resolved to 
`FixedRestartStrategy` if checkpointing is enabled and if not it resolves to 
`NoRestartStrategy`.

What do you think?


---


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541921#comment-16541921
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202103083
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

No, timers cannot use state descriptor, they cannot extend `State`


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202103083
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

No, timers cannot use state descriptor, they cannot extend `State`


---


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541899#comment-16541899
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202098798
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

Right now null is a bit different than `FallbackRestartStrategy`. 
* null - allows fallback to `FixedRestartStrategy` in case of checkpointing 
enabled and `noRestart` was set on server-side
* `FallbackRestartStrategy` - always the server-side strategy is used 
(indifferent to checkpointing)

If we by default set the `FallbackStrategy` we have two options:
 * we either always set `FixedRestartStrategy` if checkpointing is enabled 
and `noRestart` was set on server side
* we never automatically fallback to `FixedRestartStrategy`, even in case 
of checkpointing.

What do you think would be better option? Keep the null, always fallback to 
`FixedRestartStrategy` or never fallback to it?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >