[jira] [Updated] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2

2018-03-21 Thread Mohammad Abareghi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohammad Abareghi updated FLINK-9029:
-
Description: 
*Environment*
 * Flink-1.4.2
 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
 * Ubuntu 16.04.3 LTS
 * Java 8

 

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:
{code:java}
WARN org.apache.hadoop.security.UserGroupInformation: 
PriviledgedActionException as:xng (auth:SIMPLE) 
cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
{code}
*NOTE*:
 * If I run the same job on flink-1.4.0, Error disappears regardless of what 
version of flink (1.4.0 or 1.4.2) dependencies I have for job
 * Also if I run the job main method from my IDE and pass the same parameters, 
I don't get above error.

*NOTE*:

It seems the problem somehow is in 
{{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with 
{{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster 
and run my job (flink topology) then the error doesn't appear.

  was:
*Environment*
 * Flink-1.4.2
 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
 * Ubuntu 16.04.3 LTS
 * Java 8

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:

 

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:
{code:java}
WARN org.apache.hadoop.security.UserGroupInformation: 
PriviledgedActionException as:xng (auth:SIMPLE) 
cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
{code}
*NOTE*:
 * If I run the same job on flink-1.4.0, Error disappears regardless of what 
version of flink (1.4.0 or 1.4.2) dependencies I have for job
 * Also if I run the job main method from my IDE and pass the same parameters, 
I don't get above error.

*NOTE*:

It seems the problem somehow is in 
{{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with 
{{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster 
and run my job (flink topology) then the error doesn't appear.


> Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
> ---
>
> Key: FLINK-9029
> URL: https://issues.apache.org/jira/browse/FLINK-9029
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.4.2
> Environment: * Flink-1.4.2 (Flink-1.4.1)
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>Reporter: Mohammad Abareghi
>Priority: Major
>
> *Environment*
>  * Flink-1.4.2
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>  
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
> {code:java}
> WARN org.apache.hadoop.security.UserGroupInformation: 
> PriviledgedActionException as:xng (auth:SIMPLE) 
> cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
> {code}
> *NOTE*:
>  * If I run the same job on flink-1.4.0, Error disappears regardless of what 
> version of flink (1.4.0 or 1.4.2) dependencies I have for job
>  * Also if I run the job main method from my IDE and pass the same 
> parameters, I don't get above error.
> *NOTE*:
> It seems the problem somehow is in 
> {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that 
> with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the 
> cluster and run my job (flink topology) then the error doesn't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2

2018-03-21 Thread Mohammad Abareghi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407582#comment-16407582
 ] 

Mohammad Abareghi commented on FLINK-9029:
--

[~StephanEwen] Yes. Security is OFF. 

I'll try to remove Hadoop uber jar ASAP (hopefully later today).

Will drop a comment here. 

> Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
> ---
>
> Key: FLINK-9029
> URL: https://issues.apache.org/jira/browse/FLINK-9029
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.4.2
> Environment: * Flink-1.4.2 (Flink-1.4.1)
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>Reporter: Mohammad Abareghi
>Priority: Major
>
> *Environment*
>  * Flink-1.4.2
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>  
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
> {code:java}
> WARN org.apache.hadoop.security.UserGroupInformation: 
> PriviledgedActionException as:xng (auth:SIMPLE) 
> cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
> {code}
> *NOTE*:
>  * If I run the same job on flink-1.4.0, Error disappears regardless of what 
> version of flink (1.4.0 or 1.4.2) dependencies I have for job
>  * Also if I run the job main method from my IDE and pass the same 
> parameters, I don't get above error.
> *NOTE*:
> It seems the problem somehow is in 
> {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that 
> with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the 
> cluster and run my job (flink topology) then the error doesn't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176006260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

good catch, will fix that upon merging


---


[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407588#comment-16407588
 ] 

ASF GitHub Bot commented on FLINK-9034:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176006260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

good catch, will fix that upon merging


> State Descriptors drop TypeInformation on serialization
> ---
>
> Key: FLINK-9034
> URL: https://issues.apache.org/jira/browse/FLINK-9034
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction  {
> private final ValueStateDescriptor descr = new 
> ValueStateDescriptor<>("state name", MyType.class);
> private ValueState state;
> @Override
> public void open() {
> state = getRuntimeContext().getValueState(descr);
> }
> }
> {code}
> The problem is that the state descriptor drops the type information and 
> creates a serializer before serialization as part of shipping the function in 
> the cluster. To do that, it initializes the serializer with an empty 
> execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information 
> before shipping was necessary, because the type info was not serializable. It 
> now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9037) Test flake Kafka09ITCase#testCancelingEmptyTopic

2018-03-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9037:
---

 Summary: Test flake Kafka09ITCase#testCancelingEmptyTopic
 Key: FLINK-9037
 URL: https://issues.apache.org/jira/browse/FLINK-9037
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Stephan Ewen


{code}
Test 
testCancelingEmptyTopic(org.apache.flink.streaming.connectors.kafka.Kafka09ITCase)
 failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 6 
milliseconds
{code}

Full log: https://api.travis-ci.org/v3/job/356044885/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407601#comment-16407601
 ] 

Till Rohrmann commented on FLINK-9026:
--

I think it is often better to close/unregister resources in the scope where 
they have been opened/registered. This makes resource management much easier.

But then we should at least close the {{TaskManagerMetricGroup}} when the 
{{TaskExecutor}} is shut down.

> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8979) Extend Kafka end-to-end tests to run with different versions

2018-03-21 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8979:
--

Assignee: Tzu-Li (Gordon) Tai

> Extend Kafka end-to-end tests to run with different versions
> 
>
> Key: FLINK-8979
> URL: https://issues.apache.org/jira/browse/FLINK-8979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current {{Kafka}} end-to-end test only runs with Kafka 0.10. We should 
> extend the test to also run with
> * Kafka 0.8
> * Kafka 0.9
> * Kafka 0.11
> Additionally we should change the test job to not be embarrassingly parallel 
> by introducing a shuffle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8519) FileAlreadyExistsException on Start Flink Session

2018-03-21 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-8519.
--
Resolution: Invalid

Thanks [~yew1eb] for the clarification. Let me close this jira ticket then.

> FileAlreadyExistsException on Start Flink Session 
> --
>
> Key: FLINK-8519
> URL: https://issues.apache.org/jira/browse/FLINK-8519
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Hai Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> *steps to reproduce:*
>  1. build flink from source , git commit: c1734f4
>  2. run script:
> source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher;
> export YARN_CONF_DIR=/path/hadoop/etc/hadoop;
> export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop;
> export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; 
> /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D 
> yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java 
> %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm 
> job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d
>  
>  *error infos:*
> 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> Error while running the Flink Yarn session.
>  java.lang.reflect.UndeclaredThrowableException
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786)
>  Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>  ... 2 more
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user 
> already exists as dir; cannot create link here
>  at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244)
>  at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:485)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384)
>  ... 7 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5733

[FLINK-8975] [test] Add resume from savepoint end-to-end test

## What is the purpose of the change

This pull request adds an end-to-end test that verifies resuming a job from 
a savepoint.

The complete end-to-end test consists of the following:

1. The `StateMachineExample` is used for the end-to-end test
2. A separate job to generate Kafka events for the state machine is run
3. After the state machine job runs for a while, we take a savepoint.
4. Cancelling and resuming the state machine example job with the savepoint.

All the above steps should not result in any errors or outputs from the 
state machine job. If so, the end-to-end test would fail.

## Brief change log

- Add a separate main class for the Kafka events generator job
- Add `test_resume_savepoint.sh` test script

## Verifying this change

This PR itself introduces a new test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5733


commit 529e060cb05fd723b8656dcc9ef48f8011282dd8
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:25:37Z

[FLINK-8975] [test] Add Kafka events generator job for StateMachineExample

commit 213638b4194cceccd597e90c78631a6c6a191abb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:32:51Z

[FLINK-8975] [test] Add resume from savepoint end-to-end test




---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407616#comment-16407616
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5733

[FLINK-8975] [test] Add resume from savepoint end-to-end test

## What is the purpose of the change

This pull request adds an end-to-end test that verifies resuming a job from 
a savepoint.

The complete end-to-end test consists of the following:

1. The `StateMachineExample` is used for the end-to-end test
2. A separate job to generate Kafka events for the state machine is run
3. After the state machine job runs for a while, we take a savepoint.
4. Cancelling and resuming the state machine example job with the savepoint.

All the above steps should not result in any errors or outputs from the 
state machine job. If so, the end-to-end test would fail.

## Brief change log

- Add a separate main class for the Kafka events generator job
- Add `test_resume_savepoint.sh` test script

## Verifying this change

This PR itself introduces a new test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5733


commit 529e060cb05fd723b8656dcc9ef48f8011282dd8
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:25:37Z

[FLINK-8975] [test] Add Kafka events generator job for StateMachineExample

commit 213638b4194cceccd597e90c78631a6c6a191abb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:32:51Z

[FLINK-8975] [test] Add resume from savepoint end-to-end test




> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the general purpose testing job 
> FLINK-8971 with failures disabled for that.
> The end-to-end test should do the following:
> * Submit FLINK-8971 job
> * Verify that the savepoint is there
> * Cancel job and resume from savepoint
> * Verify that job could be resumed
> * Use different StateBackends: RocksDB incremental async/sync, RocksDB full 
> async/sync, FsStateBackend aysnc/sync



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-03-21 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407618#comment-16407618
 ] 

Nico Kruber commented on FLINK-8899:


It should, however, also not be in "minor" priority as this may affect user 
experience - as do all the other mentioned exceptions (which should get JIRA 
tickets). Every exception in the log will potentially make the users (and us) 
investigate it and burn a lot of time.

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: flip-6
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176011796
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

An alternative to this:
Query "read-records" metrics via the REST API, and only proceed after the 
job has processed a said number of records.
This however requires adding an extra dependency, such as jq, for the 
response parsing.


---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407621#comment-16407621
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176011796
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

An alternative to this:
Query "read-records" metrics via the REST API, and only proceed after the 
job has processed a said number of records.
This however requires adding an extra dependency, such as jq, for the 
response parsing.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usual

[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8899:
-
Priority: Major  (was: Minor)

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: flip-6
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>   at o

[jira] [Commented] (FLINK-8909) pyflink.sh not working with yarn

2018-03-21 Thread Hitesh Tiwari (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407633#comment-16407633
 ] 

Hitesh Tiwari commented on FLINK-8909:
--

Thanks, 
Configured `python.dc.tmp.dir` and  it worked  for us.  


> pyflink.sh not working with yarn
> 
>
> Key: FLINK-8909
> URL: https://issues.apache.org/jira/browse/FLINK-8909
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Hitesh Tiwari
>Priority: Blocker
>
> Hi,
> i want to run the python application from pyflink.sh  with yarn-cluster mode. 
> Added  "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated  pyflink.sh is 
> executing below coomand:
> "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1  -v 
> "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"
>  Running pyflink.sh:
> ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py
> While running  getting below Error:
> java.lang.Exception: The user defined 'open()' method caused an exception: An 
> error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ... 1 more
> 03/09/2018 11:20:23 Job execution switched to status FAILING.
>  java.lang.Exception: The user defined 'open()' method caused an exception: 
> An error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:

[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9038:

Description: 
https://travis-ci.org/apache/flink/jobs/355821305

{code}
est 
testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
 failed with:
java.lang.AssertionError: Found a file 
/home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
 with a prohibited string (one of [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]). Excerpts:
[
2018-03-20 15:44:20,533 ERROR 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
on heartbeat
]
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
at 
org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
{code}

  was:https://travis-ci.org/apache/flink/jobs/355821305


> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {cod

[jira] [Created] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9038:
---

 Summary: YARNSessionCapacitySchedulerITCase fails on travis
 Key: FLINK-9038
 URL: https://issues.apache.org/jira/browse/FLINK-9038
 Project: Flink
  Issue Type: Bug
  Components: Tests, YARN
Affects Versions: 1.5.0
Reporter: Chesnay Schepler
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9038:

Description: https://travis-ci.org/apache/flink/jobs/355821305

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Changes look good @yanghua. Merging this PR.


---


[jira] [Closed] (FLINK-8909) pyflink.sh not working with yarn

2018-03-21 Thread Hitesh Tiwari (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hitesh Tiwari closed FLINK-8909.

Resolution: Not A Problem

> pyflink.sh not working with yarn
> 
>
> Key: FLINK-8909
> URL: https://issues.apache.org/jira/browse/FLINK-8909
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Hitesh Tiwari
>Priority: Blocker
>
> Hi,
> i want to run the python application from pyflink.sh  with yarn-cluster mode. 
> Added  "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated  pyflink.sh is 
> executing below coomand:
> "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1  -v 
> "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"
>  Running pyflink.sh:
> ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py
> While running  getting below Error:
> java.lang.Exception: The user defined 'open()' method caused an exception: An 
> error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ... 1 more
> 03/09/2018 11:20:23 Job execution switched to status FAILING.
>  java.lang.Exception: The user defined 'open()' method caused an exception: 
> An error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.uti

[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407636#comment-16407636
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Changes look good @yanghua. Merging this PR.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407641#comment-16407641
 ] 

Till Rohrmann commented on FLINK-9038:
--

I thin this one is a duplicate of FLINK-8899 or vice versa.

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {code}
> est 
> testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>  failed with:
> java.lang.AssertionError: Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
>  with a prohibited string (one of [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]). Excerpts:
> [
> 2018-03-20 15:44:20,533 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> ]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
>   at 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.Forke

[jira] [Closed] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9038.

Resolution: Duplicate

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {code}
> est 
> testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>  failed with:
> java.lang.AssertionError: Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
>  with a prohibited string (one of [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]). Excerpts:
> [
> 2018-03-20 15:44:20,533 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> ]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
>   at 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v7

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176019938
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when do the jobs shut down if no error occurs?


---


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176020327
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

you could use the log4j reporter and grep the logs.


---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407670#comment-16407670
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176020327
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

you could use the log4j reporter and grep the logs.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the gen

[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407669#comment-16407669
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176019938
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when do the jobs shut down i

[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union

2018-03-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407683#comment-16407683
 ] 

Fabian Hueske commented on FLINK-9031:
--

Adding relevant information from the mail thread:

[~StephanEwen] suggested
{quote}
To diagnose that, can you please check the following:
   - Change the Person data type to be immutable (final fields, no setters, set 
fields in constructor instead). Does that make the problem go away?
   - Change the Person data type to not be a POJO by adding a dummy fields that 
is never used, but does not have a getter/setter. 
Does that make the problem go away?
If either of that is the case, it must be a mutability bug somewhere in either 
accidental object reuse or accidental serializer sharing.
{quote}
 
Making the Person object immutable solved the problem.

> DataSet Job result changes when adding rebalance after union
> 
>
> Key: FLINK-9031
> URL: https://issues.apache.org/jira/browse/FLINK-9031
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime, Optimizer
>Affects Versions: 1.3.1
>Reporter: Fabian Hueske
>Priority: Critical
> Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt
>
>
> A user [reported this issue on the user mailing 
> list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E].
> {quote}I am using Flink 1.3.1 and I have found a strange behavior on running 
> the following logic:
>  # Read data from file and store into DataSet
>  # Split dataset in two, by checking if "field1" of POJOs is empty or not, so 
> that the first dataset contains only elements with non empty "field1", and 
> the second dataset will contain the other elements.
>  # Each dataset is then grouped by, one by "field1" and other by another 
> field, and subsequently reduced.
>  # The 2 datasets are merged together by union.
>  # The final dataset is written as json.
> What I was expected, from output, was to find only one element with a 
> specific value of "field1" because:
>  # Reducing the first dataset grouped by "field1" should generate only one 
> element with a specific value of "field1".
>  # The second dataset should contain only elements with empty "field1".
>  # Making an union of them should not duplicate any record.
> This does not happen. When i read the generated jsons i see some duplicate 
> (non empty) values of "field1".
>  Strangely this does not happen when the union between the two datasets is 
> not computed. In this case the first dataset produces elements only with 
> distinct values of "field1", while second dataset produces only records with 
> empty field "value1".
> {quote}
> The user has not enable object reuse.
> Later he reports that the problem disappears when he injects a rebalance() 
> after a union resolves the problem. I had a look at the execution plans for 
> both cases (attached to this issue) but could not identify a problem.
> Hence I assume, this might be an issue with the runtime code but we need to 
> look deeper into this. The user also provided an example program consisting 
> of two classes which are attached to the issue as well.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5734

[FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the 
TaskExecutor is shut down

## What is the purpose of the change

We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is 
shutdown.

## Brief change log

- close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK_9026

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5734


commit 94ecbc87e53b4fff306a864971f164c765122194
Author: sihuazhou 
Date:   2018-03-21T09:46:31Z

close the TaskManagerMetricGroup when the TaskExecutor is shut down




---


[GitHub] flink issue #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricGroup wh...

2018-03-21 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5734
  
CC: @tillrohrmann 


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407686#comment-16407686
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5734

[FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the 
TaskExecutor is shut down

## What is the purpose of the change

We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is 
shutdown.

## Brief change log

- close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK_9026

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5734


commit 94ecbc87e53b4fff306a864971f164c765122194
Author: sihuazhou 
Date:   2018-03-21T09:46:31Z

close the TaskManagerMetricGroup when the TaskExecutor is shut down




> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407688#comment-16407688
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5734
  
CC: @tillrohrmann 


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027456
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when the script exits, a cleanup hook shuts down the cluster. It also 
parses the logs for any unexpected errors; if there is one, the test fails.


---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407691#comment-16407691
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027456
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when the script exits, a c

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027636
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

That's a good idea! I'll give this approach a try.


---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407692#comment-16407692
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027636
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

That's a good idea! I'll give this approach a try.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the ge

[jira] [Created] (FLINK-9039) Broken link to Hadoop Setup Guide in docs

2018-03-21 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9039:
--

 Summary: Broken link to Hadoop Setup Guide in docs
 Key: FLINK-9039
 URL: https://issues.apache.org/jira/browse/FLINK-9039
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2
Reporter: Florian Schmidt


On  
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/dependencies.html]
 under the section Hadoop Dependencies there is a link to "Hadoop Setup Guide" 
which links to 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/hadoop.html,]
 which in turn does not exist



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

this method never throws exceptions


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407696#comment-16407696
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028213
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -265,7 +265,11 @@ class TaskManager(
   case t: Exception => log.error("FileCache did not shutdown 
properly.", t)
 }
 
-taskManagerMetricGroup.close()
+try {
+  taskManagerMetricGroup.close()
--- End diff --

same as above


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028213
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -265,7 +265,11 @@ class TaskManager(
   case t: Exception => log.error("FileCache did not shutdown 
properly.", t)
 }
 
-taskManagerMetricGroup.close()
+try {
+  taskManagerMetricGroup.close()
--- End diff --

same as above


---


[GitHub] flink pull request #5716: [FLINK-9022][state] fix resource release in Stream...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5716


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407695#comment-16407695
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

this method never throws exceptions


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5726: [FLINK-9028][flip6] perform parameters checking be...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5726


---


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176028721
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

The only trap is see is `kafka_cleanup`, which shuts down ZK and kafka. 
What about the state machine though?


---


[jira] [Resolved] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9022.
--
Resolution: Fixed

Fixed via
master: f9df13c5058f194a5c686b9b753345d9226fc87a
1.5.0: 27189d8058d6c3bc00dbc8409f40bedbccf01ac5

> fix resource close in 
> `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
> ---
>
> Key: FLINK-9022
> URL: https://issues.apache.org/jira/browse/FLINK-9022
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We have the following code in 
> {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
> incorrect:
> {code}
> } catch (Exception ex) {
>   // cleanup if something went wrong before results got published.
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend); // this should close 
> operatorStateBackend
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
>   IOUtils.closeQuietly(rawKeyedStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   throw new Exception("Exception while creating 
> StreamOperatorStateContext.", ex);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9028) flip6 should check config before starting cluster

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407698#comment-16407698
 ] 

ASF GitHub Bot commented on FLINK-9028:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5726


> flip6 should check config before starting cluster
> -
>
> Key: FLINK-9028
> URL: https://issues.apache.org/jira/browse/FLINK-9028
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> In flip6, we should perform parameters checking before starting cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407697#comment-16407697
 ] 

ASF GitHub Bot commented on FLINK-9022:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5716


> fix resource close in 
> `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
> ---
>
> Key: FLINK-9022
> URL: https://issues.apache.org/jira/browse/FLINK-9022
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We have the following code in 
> {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
> incorrect:
> {code}
> } catch (Exception ex) {
>   // cleanup if something went wrong before results got published.
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend); // this should close 
> operatorStateBackend
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
>   IOUtils.closeQuietly(rawKeyedStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   throw new Exception("Exception while creating 
> StreamOperatorStateContext.", ex);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8980) End-to-end test: BucketingSink

2018-03-21 Thread Florian Schmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt reassigned FLINK-8980:
--

Assignee: Florian Schmidt

> End-to-end test: BucketingSink
> --
>
> Key: FLINK-8980
> URL: https://issues.apache.org/jira/browse/FLINK-8980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In order to verify the {{BucketingSink}}, we should add an end-to-end test 
> which verifies that the {{BucketingSink}} does not lose data under failures.
> An idea would be to have a CountUp job which simply counts up a counter which 
> is persisted. The emitted values will be written to disk by the 
> {{BucketingSink}}. Now we should kill randomly Flink processes (cluster 
> entrypoint and TaskExecutors) to simulate failures. Even after these 
> failures, the written files should contain the correct sequence of numbers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407700#comment-16407700
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176028721
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

The only trap is see is `kaf

[jira] [Resolved] (FLINK-9028) flip6 should check config before starting cluster

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9028.
--
Resolution: Fixed

Fixed via
master:
38aa863d5a710b283b5c9b2eb9225d6fb9cc0c70
7c952dd3a75bc64d10bf9be12e405bbc349422b1

1.5.0:
fecc19088b36fc4c8bca5ff39ba756f8fd71
c6f91334b67d589f0c17ed75c9dbcbaedaf8ba51

> flip6 should check config before starting cluster
> -
>
> Key: FLINK-9028
> URL: https://issues.apache.org/jira/browse/FLINK-9028
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> In flip6, we should perform parameters checking before starting cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176029020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

I was intended to catch maybe some `RuntimeException`... I will just remove 
the `try catch`.


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407703#comment-16407703
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176029020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

I was intended to catch maybe some `RuntimeException`... I will just remove 
the `try catch`.


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176029439
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
--- End diff --

@zentol the `kafka_cleanup` trap also includes this, which shuts down the 
Flink cluster and checks logs for errors.


---


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407705#comment-16407705
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176029439
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
--- End diff --

@zentol the `kafka_cleanup` trap also includes this, which shuts down the 
Flink cluster and checks logs for errors.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the general purpose testing job 
> FLINK-8971 with failures disabled for that.
> The end-to-end test should do the following:
> * Submit FLINK-8971 job
> * Verify that the savepoint is there
> * Cancel job and resume from savepoint
> * Verify that job could be resumed
> * Use different StateBackends: RocksDB incremental async/sync, RocksDB full 
> async/sync, FsStateBackend aysnc/sync



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5717: [FLINK-9020][E2E Tests] Use separate modules per t...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5717#discussion_r176032646
  
--- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml ---
@@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   parent-child-classloading-test
--- End diff --

* add `flink` prefix
* add `_${scala.binary.version}` suffix
* add `flink-parent-child-classloading-test` (this is what 
allows us to omit the scala stuff when listing child modules)


---


[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407724#comment-16407724
 ] 

ASF GitHub Bot commented on FLINK-9020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5717#discussion_r176032646
  
--- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml ---
@@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   parent-child-classloading-test
--- End diff --

* add `flink` prefix
* add `_${scala.binary.version}` suffix
* add `flink-parent-child-classloading-test` (this is what 
allows us to omit the scala stuff when listing child modules)


> Move test projects of end-to-end tests in separate modules
> --
>
> Key: FLINK-9020
> URL: https://issues.apache.org/jira/browse/FLINK-9020
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I would like to propose to move each test case in the end-to-end tests into 
> it's own module. Reason is that currently we are building all jars for the 
> tests from one pom.xml, which makes it hard to have specific tests for 
> certain build types (e.g. examples derived from the flink quickstart 
> archetype).
> For the current state this would mean
> - change packaging from flink-end-to-end-tests from jar to pom
> - refactor the classloader example to be in its own module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407726#comment-16407726
 ] 

Till Rohrmann commented on FLINK-9010:
--

Does your Yarn cluster has enough resources to run this program? If your 
program consists of 2 operators and you run it with DOP 400, then it should 
require 800 slots (logical). If the two operators are in the same slot sharing 
group, then two logical slots will be deployed to the same {{TaskExecutor}} 
slot. Thus, I'm not sure whether this is an actual problem here. 

Please verify and if this is the case, then let's close this issue.

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>  

[GitHub] flink issue #5717: [FLINK-9020][E2E Tests] Use separate modules per testcase

2018-03-21 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5717
  
Thanks @zentol! I addressed your comments and will squash if approved


---


[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407732#comment-16407732
 ] 

ASF GitHub Bot commented on FLINK-9020:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5717
  
Thanks @zentol! I addressed your comments and will squash if approved


> Move test projects of end-to-end tests in separate modules
> --
>
> Key: FLINK-9020
> URL: https://issues.apache.org/jira/browse/FLINK-9020
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I would like to propose to move each test case in the end-to-end tests into 
> it's own module. Reason is that currently we are building all jars for the 
> tests from one pom.xml, which makes it hard to have specific tests for 
> certain build types (e.g. examples derived from the flink quickstart 
> archetype).
> For the current state this would mean
> - change packaging from flink-end-to-end-tests from jar to pom
> - refactor the classloader example to be in its own module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9040:
---

 Summary: JobVertex#setMaxParallelism does not valid argument
 Key: FLINK-9040
 URL: https://issues.apache.org/jira/browse/FLINK-9040
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.5.0
Reporter: Chesnay Schepler


{code}
/**
* Sets the maximum parallelism for the task.
*
* @param maxParallelism The maximum parallelism to be set. must be between 1 
and Short.MAX_VALUE.
*/
public void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5729: [FLINK-7343][kafka-tests] Fix test at-least-once test ins...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5729
  
Sweet, nice to see this fixed.

Code looks good, +1 to merge!


---


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407737#comment-16407737
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5729
  
Sweet, nice to see this fixed.

Code looks good, +1 to merge!


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5725
  
Thank your trying to help improve Flink.

I think, however, this patch is not necessary. The method does not require 
synchronization, it should be correct as it is.


---


[jira] [Assigned] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument

2018-03-21 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9040:
-

Assignee: Sihua Zhou

> JobVertex#setMaxParallelism does not valid argument
> ---
>
> Key: FLINK-9040
> URL: https://issues.apache.org/jira/browse/FLINK-9040
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Sihua Zhou
>Priority: Minor
>
> {code}
> /**
> * Sets the maximum parallelism for the task.
> *
> * @param maxParallelism The maximum parallelism to be set. must be between 1 
> and Short.MAX_VALUE.
> */
> public void setMaxParallelism(int maxParallelism) {
>   this.maxParallelism = maxParallelism;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407739#comment-16407739
 ] 

ASF GitHub Bot commented on FLINK-8394:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5725
  
Thank your trying to help improve Flink.

I think, however, this patch is not necessary. The method does not require 
synchronization, it should be correct as it is.


> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5725: [FLINK-8394] Lack of synchronization accessing exp...

2018-03-21 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5725


---


[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...

2018-03-21 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5725
  
@StephanEwen OK, let me close this PR, thanks.


---


[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407742#comment-16407742
 ] 

ASF GitHub Bot commented on FLINK-8394:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5725


> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407741#comment-16407741
 ] 

ASF GitHub Bot commented on FLINK-8394:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5725
  
@StephanEwen OK, let me close this PR, thanks.


> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9041:
---

 Summary: Refactor StreamTaskTest to not use scala and akka
 Key: FLINK-9041
 URL: https://issues.apache.org/jira/browse/FLINK-9041
 Project: Flink
  Issue Type: Improvement
  Components: Streaming, Tests
Affects Versions: 1.5.0
Reporter: Chesnay Schepler






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed FLINK-8394.
---
Resolution: Won't Fix

> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask

2018-03-21 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-7897:
---

Assignee: vinoyang

> Consider using nio.Files for file deletion in TransientBlobCleanupTask
> --
>
> Key: FLINK-7897
> URL: https://issues.apache.org/jira/browse/FLINK-7897
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> nio.Files#delete() provides better clue as to why the deletion may fail:
> https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)
> Depending on the potential exception (FileNotFound), the call to 
> localFile.exists() may be skipped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5718
  
Having a custom timeout like @zentol suggests could be helpful in the long 
run.

For now, though, I personally agree that the timeouts are counter 
productive (I actually never understood what they are supposed to solve). I 
prefer a deadlocking test to stay around, so I can jstack to the process, 
attach a debugger, pull a heap dump, whatever. The only environment where we 
cannot do that is the CI server, which has a timeout and thread dump already...


---


[jira] [Created] (FLINK-9042) Port ResumeCheckpointManuallyITCase to flip6

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9042:
---

 Summary: Port ResumeCheckpointManuallyITCase to flip6
 Key: FLINK-9042
 URL: https://issues.apache.org/jira/browse/FLINK-9042
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Tests
Affects Versions: 1.5.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407745#comment-16407745
 ] 

ASF GitHub Bot commented on FLINK-8073:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5718
  
Having a custom timeout like @zentol suggests could be helpful in the long 
run.

For now, though, I personally agree that the timeouts are counter 
productive (I actually never understood what they are supposed to solve). I 
prefer a deadlocking test to stay around, so I can jstack to the process, 
attach a debugger, pull a heap dump, whatever. The only environment where we 
cannot do that is the CI server, which has a timeout and thread dump already...


> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5712: [FLINK-9011] YarnResourceManager spamming log file at INF...

2018-03-21 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5712
  
cc @tillrohrmann could you have a look at this? 


---


[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407749#comment-16407749
 ] 

ASF GitHub Bot commented on FLINK-9011:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5712
  
cc @tillrohrmann could you have a look at this? 


> YarnResourceManager spamming log file at INFO level
> ---
>
> Key: FLINK-9011
> URL: https://issues.apache.org/jira/browse/FLINK-9011
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For every requested resource, the {{YarnResourceManager}} spams the log with 
> log-level INFO and the following messages:
> {code}
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
> 2018-03-16 03:41:20,190 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka

2018-03-21 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9041:
-

Assignee: Sihua Zhou

> Refactor StreamTaskTest to not use scala and akka
> -
>
> Key: FLINK-9041
> URL: https://issues.apache.org/jira/browse/FLINK-9041
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Sihua Zhou
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-03-21 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407756#comment-16407756
 ] 

vinoyang commented on FLINK-8946:
-

[~till.rohrmann] I just tracked this issue from source code, For Flip-6, 
specific code in *TaskExecutor*, the *TaskManagerMetricGroup's* instance be 
injected with the constructor parameter, and the metricGroup's instance is 
created at *TaskManagerRunner.startTaskManager*. In *TaskExecutor*, the 
instance is not be closed, so it seems like it's not a problem in Flip-6. 

> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5705: [FLINK-8968][state]Fix native resource leak caused by Rea...

2018-03-21 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5705
  
Hi @tillrohrmann could you please have a look at this? This PR covers multi 
bugs that should be addressed...I think maybe stefan is too busy with the 
testing works on 1.5 currently...


---


[jira] [Commented] (FLINK-8968) Fix native resource leak caused by ReadOptions

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407760#comment-16407760
 ] 

ASF GitHub Bot commented on FLINK-8968:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5705
  
Hi @tillrohrmann could you please have a look at this? This PR covers multi 
bugs that should be addressed...I think maybe stefan is too busy with the 
testing works on 1.5 currently...


> Fix native resource leak caused by ReadOptions 
> ---
>
> Key: FLINK-8968
> URL: https://issues.apache.org/jira/browse/FLINK-8968
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the creation of ReadOptions out of the loop in 
> {{RocksDBFullSnapshotOperation.writeKVStateMetaData()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...

2018-03-21 Thread yew1eb
Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r176047433
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

Sorry, I forgot to tell you that the `log.debug()` method internally calls 
`islogEnabled()`.
You can see the `org.slf4j.Logger`'s log4j adapter class:
```
  //org.slf4j.impl.Log4jLoggerAdapter.java
...
  public void debug(String format, Object arg) {
if (logger.isDebugEnabled()) {
  FormattingTuple ft = MessageFormatter.format(format, arg);
  logger.log(FQCN, Level.DEBUG, ft.getMessage(), ft.getThrowable());
}
  }
...
```


---


[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407763#comment-16407763
 ] 

ASF GitHub Bot commented on FLINK-9011:
---

Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r176047433
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

Sorry, I forgot to tell you that the `log.debug()` method internally calls 
`islogEnabled()`.
You can see the `org.slf4j.Logger`'s log4j adapter class:
```
  //org.slf4j.impl.Log4jLoggerAdapter.java
...
  public void debug(String format, Object arg) {
if (logger.isDebugEnabled()) {
  FormattingTuple ft = MessageFormatter.format(format, arg);
  logger.log(FQCN, Level.DEBUG, ft.getMessage(), ft.getThrowable());
}
  }
...
```


> YarnResourceManager spamming log file at INFO level
> ---
>
> Key: FLINK-9011
> URL: https://issues.apache.org/jira/browse/FLINK-9011
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For every requested resource, the {{YarnResourceManager}} spams the log with 
> log-level INFO and the following messages:
> {code}
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
> 2018-03-16 03:41:20,190 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.proper

[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407770#comment-16407770
 ] 

ASF GitHub Bot commented on FLINK-9011:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r176048790
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

I know as I said in the previous comment `and each of them would do the 
same judgement inside the debug method.`. Here I do a judgement to avoid that 
if the log level upper then DEBUG, it could just jump these code.




> YarnResourceManager spamming log file at INFO level
> ---
>
> Key: FLINK-9011
> URL: https://issues.apache.org/jira/browse/FLINK-9011
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For every requested resource, the {{YarnResourceManager}} spams the log with 
> log-level INFO and the following messages:
> {code}
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
> 2018-03-16 03:41:20,190 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...

2018-03-21 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r176048790
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

I know as I said in the previous comment `and each of them would do the 
same judgement inside the debug method.`. Here I do a judgement to avoid that 
if the log level upper then DEBUG, it could just jump these code.




---


[jira] [Reopened] (FLINK-8699) Fix concurrency problem in rocksdb full checkpoint

2018-03-21 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reopened FLINK-8699:
---

Reopen because the problem still exists.

> Fix concurrency problem in rocksdb full checkpoint
> --
>
> Key: FLINK-8699
> URL: https://issues.apache.org/jira/browse/FLINK-8699
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In full checkpoint, `kvStateInformation` is not a copied object and it can be 
> changed when writeKVStateMetaData() is invoking ... This can lead to 
> problematic, which is serious.
> {code}
> private void writeKVStateMetaData() throws IOException {
>   // ...
> for (Map.Entry RegisteredKeyedBackendStateMetaInfo>> column :
>   stateBackend.kvStateInformation.entrySet()) {
> }
>   //...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9018) Unclosed snapshotCloseableRegistry in RocksDBKeyedStateBackend#FullSnapshotStrategy#performSnapshot

2018-03-21 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou resolved FLINK-9018.
---
Resolution: Fixed

> Unclosed snapshotCloseableRegistry in 
> RocksDBKeyedStateBackend#FullSnapshotStrategy#performSnapshot
> ---
>
> Key: FLINK-9018
> URL: https://issues.apache.org/jira/browse/FLINK-9018
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ted Yu
>Assignee: Sihua Zhou
>Priority: Minor
>
> {code}
>   final CloseableRegistry snapshotCloseableRegistry = new 
> CloseableRegistry();
>   if (kvStateInformation.isEmpty()) {
> if (LOG.isDebugEnabled()) {
>   LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed 
> state at {}. Returning null.",
> timestamp);
> }
> return DoneFuture.of(SnapshotResult.empty());
>   }
> {code}
> If the method returns in the above if block, snapshotCloseableRegistry is not 
> closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...

2018-03-21 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5735

 [FLINK-9036] [core] Add default values to State Descriptors via suppliers

**This PR is based on #5732 and only the last two commits are relevant for 
this PR**

## What is the purpose of the change

Earlier versions had a default value in `ValueState`. We dropped that, 
because the value would have to be duplicated on each access, to be safe 
against side effects when using mutable types.

This pull request re-adds the feature, but using a supplier/factory 
function to create the default value on access. This is more efficient than 
copying a shared default value on access.

## Brief change log

  - The `StateDescriptor` produces default values through a optional 
`Supplier` function.
  - For backwards compatibility, the mode to pass directly a value is kept. 
The value is wrapped in a `DefaultValueFactory` which implements the legacy 
functionality using a serializer to copy the value on each access.

## Verifying this change

  - This change adds a set of unit tests
  - The change modifies one example program (`StateMachineExample`). 
Running that example shows how the change works end-to-end.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
state_default_values

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5735


commit 1c756f0d6dfe71114a97a1b9effaf321b9da063b
Author: Stephan Ewen 
Date:   2018-03-20T14:29:12Z

[hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit 186008c609635f99e4123912a632a4e068d3c532
Author: Stephan Ewen 
Date:   2018-03-20T14:36:19Z

[hotfix] [core] Demockitofy state descriptor tests

commit 98666506c193feffb3952d9d424d3aa924f40318
Author: Stephan Ewen 
Date:   2018-03-20T14:44:27Z

[hotfix] [core] Make State Descriptors consistently use Preconditions 
instead of Objects.

commit 1b286e4adbb5369df41c902bd161f5e854b862b8
Author: Stephan Ewen 
Date:   2018-03-20T15:22:12Z

[FLINK-9034] [core] StateDescriptor does not throw away TypeInformation 
upon serialization.

Throwing away TypeInformation upon serialization was previously done 
because the type
information was not serializable. Now that it is serializable, we can (and 
should) keep
it to provide consistent user experience, where all serializers respect the 
ExecutionConfig.

commit 6064b3d49d75d40ea69a65f5e38724bf9119b526
Author: Stephan Ewen 
Date:   2018-03-20T15:46:13Z

[hotfix] [core] Consilidate serializer duplication tests in 
StateDescriptorTest where possible

commit a29b128f4f1bec49f1403aa21889e5890dc589ee
Author: Stephan Ewen 
Date:   2018-03-20T16:16:06Z

[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

commit f19a4721acae62f8ba578c7cb235b6a917f3a258
Author: Stephan Ewen 
Date:   2018-03-20T17:04:24Z

[FLINK-9036] [core] Add default values to State Descriptors via suppliers

commit 6d7757017f52f7c3fd7cbe99d05f1de63186d12d
Author: Stephan Ewen 
Date:   2018-03-20T18:51:02Z

[FLINK-9036] [examples] Use state default value in StateMachineExample




---


[jira] [Commented] (FLINK-9036) Add default value via suppliers

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407830#comment-16407830
 ] 

ASF GitHub Bot commented on FLINK-9036:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5735

 [FLINK-9036] [core] Add default values to State Descriptors via suppliers

**This PR is based on #5732 and only the last two commits are relevant for 
this PR**

## What is the purpose of the change

Earlier versions had a default value in `ValueState`. We dropped that, 
because the value would have to be duplicated on each access, to be safe 
against side effects when using mutable types.

This pull request re-adds the feature, but using a supplier/factory 
function to create the default value on access. This is more efficient than 
copying a shared default value on access.

## Brief change log

  - The `StateDescriptor` produces default values through a optional 
`Supplier` function.
  - For backwards compatibility, the mode to pass directly a value is kept. 
The value is wrapped in a `DefaultValueFactory` which implements the legacy 
functionality using a serializer to copy the value on each access.

## Verifying this change

  - This change adds a set of unit tests
  - The change modifies one example program (`StateMachineExample`). 
Running that example shows how the change works end-to-end.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
state_default_values

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5735


commit 1c756f0d6dfe71114a97a1b9effaf321b9da063b
Author: Stephan Ewen 
Date:   2018-03-20T14:29:12Z

[hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit 186008c609635f99e4123912a632a4e068d3c532
Author: Stephan Ewen 
Date:   2018-03-20T14:36:19Z

[hotfix] [core] Demockitofy state descriptor tests

commit 98666506c193feffb3952d9d424d3aa924f40318
Author: Stephan Ewen 
Date:   2018-03-20T14:44:27Z

[hotfix] [core] Make State Descriptors consistently use Preconditions 
instead of Objects.

commit 1b286e4adbb5369df41c902bd161f5e854b862b8
Author: Stephan Ewen 
Date:   2018-03-20T15:22:12Z

[FLINK-9034] [core] StateDescriptor does not throw away TypeInformation 
upon serialization.

Throwing away TypeInformation upon serialization was previously done 
because the type
information was not serializable. Now that it is serializable, we can (and 
should) keep
it to provide consistent user experience, where all serializers respect the 
ExecutionConfig.

commit 6064b3d49d75d40ea69a65f5e38724bf9119b526
Author: Stephan Ewen 
Date:   2018-03-20T15:46:13Z

[hotfix] [core] Consilidate serializer duplication tests in 
StateDescriptorTest where possible

commit a29b128f4f1bec49f1403aa21889e5890dc589ee
Author: Stephan Ewen 
Date:   2018-03-20T16:16:06Z

[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

commit f19a4721acae62f8ba578c7cb235b6a917f3a258
Author: Stephan Ewen 
Date:   2018-03-20T17:04:24Z

[FLINK-9036] [core] Add default values to State Descriptors via suppliers

commit 6d7757017f52f7c3fd7cbe99d05f1de63186d12d
Author: Stephan Ewen 
Date:   2018-03-20T18:51:02Z

[FLINK-9036] [examples] Use state default value in StateMachineExample




> Add default value via suppliers
> ---
>
> Key: FLINK-9036
> URL: https://issues.apache.org/jira/browse/FLINK-9036
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Earlier versions had a default value in

[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union

2018-03-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407842#comment-16407842
 ] 

Stephan Ewen commented on FLINK-9031:
-

That's good input, thanks.



> DataSet Job result changes when adding rebalance after union
> 
>
> Key: FLINK-9031
> URL: https://issues.apache.org/jira/browse/FLINK-9031
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime, Optimizer
>Affects Versions: 1.3.1
>Reporter: Fabian Hueske
>Priority: Critical
> Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt
>
>
> A user [reported this issue on the user mailing 
> list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E].
> {quote}I am using Flink 1.3.1 and I have found a strange behavior on running 
> the following logic:
>  # Read data from file and store into DataSet
>  # Split dataset in two, by checking if "field1" of POJOs is empty or not, so 
> that the first dataset contains only elements with non empty "field1", and 
> the second dataset will contain the other elements.
>  # Each dataset is then grouped by, one by "field1" and other by another 
> field, and subsequently reduced.
>  # The 2 datasets are merged together by union.
>  # The final dataset is written as json.
> What I was expected, from output, was to find only one element with a 
> specific value of "field1" because:
>  # Reducing the first dataset grouped by "field1" should generate only one 
> element with a specific value of "field1".
>  # The second dataset should contain only elements with empty "field1".
>  # Making an union of them should not duplicate any record.
> This does not happen. When i read the generated jsons i see some duplicate 
> (non empty) values of "field1".
>  Strangely this does not happen when the union between the two datasets is 
> not computed. In this case the first dataset produces elements only with 
> distinct values of "field1", while second dataset produces only records with 
> empty field "value1".
> {quote}
> The user has not enable object reuse.
> Later he reports that the problem disappears when he injects a rebalance() 
> after a union resolves the problem. I had a look at the execution plans for 
> both cases (attached to this issue) but could not identify a problem.
> Hence I assume, this might be an issue with the runtime code but we need to 
> look deeper into this. The user also provided an example program consisting 
> of two classes which are attached to the issue as well.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5729: [FLINK-7343][kafka-tests] Fix test at-least-once test ins...

2018-03-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5729
  
merging.


---


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407853#comment-16407853
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5729
  
merging.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407852#comment-16407852
 ] 

ASF GitHub Bot commented on FLINK-8073:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5718
  
merging.


> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests

2018-03-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5718
  
merging.


---


[jira] [Created] (FLINK-9043) Flink recover from checkpoint like Spark Streaming

2018-03-21 Thread godfrey johnson (JIRA)
godfrey johnson created FLINK-9043:
--

 Summary: Flink recover from checkpoint like Spark Streaming 
 Key: FLINK-9043
 URL: https://issues.apache.org/jira/browse/FLINK-9043
 Project: Flink
  Issue Type: New Feature
Reporter: godfrey johnson


I know a flink job can reovery from checkpoint with restart strategy, but can 
not recovery as spark streaming jobs when job is starting.



Every time, the submitted flink job is regarded as a new job, while , in the 
spark streaming  job, which can detect the checkpoint directory first,  and 
then recovery from the latest succeed one. However, Flink only can recovery 
until the job failed first, then retry with strategy.

 

So, would flink support to recover from the checkpoint directly in a new job?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5708: [FLINK-8984][network] Drop taskmanager.exactly-once.block...

2018-03-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5708
  
merging.


---


[jira] [Commented] (FLINK-8984) Disabling credit based flow control deadlocks Flink on checkpoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407856#comment-16407856
 ] 

ASF GitHub Bot commented on FLINK-8984:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5708
  
merging.


> Disabling credit based flow control deadlocks Flink on checkpoint
> -
>
> Key: FLINK-8984
> URL: https://issues.apache.org/jira/browse/FLINK-8984
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This is configuration issue. There are two options: 
> taskmanager.network.credit-based-flow-control.enabled
> and
> taskmanager.exactly-once.blocking.data.enabled
> If we disable first one, but remain default value for the second one 
> deadlocks will occur. I think we can safely drop the second config value 
> altogether and always use blocking BarrierBuffer for credit based flow 
> control and spilling BarrierBuffer for non credit based flow control.
> cc [~zjwang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5717: [FLINK-9020][E2E Tests] Use separate modules per testcase

2018-03-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5717
  
merging.


---


[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407858#comment-16407858
 ] 

ASF GitHub Bot commented on FLINK-9020:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5717
  
merging.


> Move test projects of end-to-end tests in separate modules
> --
>
> Key: FLINK-9020
> URL: https://issues.apache.org/jira/browse/FLINK-9020
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I would like to propose to move each test case in the end-to-end tests into 
> it's own module. Reason is that currently we are building all jars for the 
> tests from one pom.xml, which makes it hard to have specific tests for 
> certain build types (e.g. examples derived from the flink quickstart 
> archetype).
> For the current state this would mean
> - change packaging from flink-end-to-end-tests from jar to pom
> - refactor the classloader example to be in its own module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407863#comment-16407863
 ] 

Till Rohrmann commented on FLINK-8946:
--

I think we should close the {{TaskManagerMetricGroup}} at some point. E.g. in 
{{TaskExecutor#postStop}} or in the caller of {{TaskExecutor}}.

> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >