[jira] [Updated] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
[ https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mohammad Abareghi updated FLINK-9029: - Description: *Environment* * Flink-1.4.2 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} * Ubuntu 16.04.3 LTS * Java 8 *Description* I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. After updating to flink-1.4.2 I'm getting the following error from Hadoop complaining that the user doesn't have write permission to the given path: {code:java} WARN org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:xng (auth:SIMPLE) cause:org.apache.hadoop.security.AccessControlException: Permission denied: user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x {code} *NOTE*: * If I run the same job on flink-1.4.0, Error disappears regardless of what version of flink (1.4.0 or 1.4.2) dependencies I have for job * Also if I run the job main method from my IDE and pass the same parameters, I don't get above error. *NOTE*: It seems the problem somehow is in {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster and run my job (flink topology) then the error doesn't appear. was: *Environment* * Flink-1.4.2 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} * Ubuntu 16.04.3 LTS * Java 8 *Description* I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After updating to flink-1.4.2 I'm getting the following error from Hadoop complaining that the user doesn't have write permission to the given path: *Description* I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After updating to flink-1.4.2 I'm getting the following error from Hadoop complaining that the user doesn't have write permission to the given path: {code:java} WARN org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:xng (auth:SIMPLE) cause:org.apache.hadoop.security.AccessControlException: Permission denied: user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x {code} *NOTE*: * If I run the same job on flink-1.4.0, Error disappears regardless of what version of flink (1.4.0 or 1.4.2) dependencies I have for job * Also if I run the job main method from my IDE and pass the same parameters, I don't get above error. *NOTE*: It seems the problem somehow is in {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster and run my job (flink topology) then the error doesn't appear. > Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2 > --- > > Key: FLINK-9029 > URL: https://issues.apache.org/jira/browse/FLINK-9029 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.1, 1.4.2 > Environment: * Flink-1.4.2 (Flink-1.4.1) > * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} > * Ubuntu 16.04.3 LTS > * Java 8 >Reporter: Mohammad Abareghi >Priority: Major > > *Environment* > * Flink-1.4.2 > * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} > * Ubuntu 16.04.3 LTS > * Java 8 > > *Description* > I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. > After updating to flink-1.4.2 I'm getting the following error from Hadoop > complaining that the user doesn't have write permission to the given path: > {code:java} > WARN org.apache.hadoop.security.UserGroupInformation: > PriviledgedActionException as:xng (auth:SIMPLE) > cause:org.apache.hadoop.security.AccessControlException: Permission denied: > user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x > {code} > *NOTE*: > * If I run the same job on flink-1.4.0, Error disappears regardless of what > version of flink (1.4.0 or 1.4.2) dependencies I have for job > * Also if I run the job main method from my IDE and pass the same > parameters, I don't get above error. > *NOTE*: > It seems the problem somehow is in > {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that > with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the > cluster and run my job (flink topology) then the error doesn't appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
[ https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407582#comment-16407582 ] Mohammad Abareghi commented on FLINK-9029: -- [~StephanEwen] Yes. Security is OFF. I'll try to remove Hadoop uber jar ASAP (hopefully later today). Will drop a comment here. > Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2 > --- > > Key: FLINK-9029 > URL: https://issues.apache.org/jira/browse/FLINK-9029 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.1, 1.4.2 > Environment: * Flink-1.4.2 (Flink-1.4.1) > * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} > * Ubuntu 16.04.3 LTS > * Java 8 >Reporter: Mohammad Abareghi >Priority: Major > > *Environment* > * Flink-1.4.2 > * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}} > * Ubuntu 16.04.3 LTS > * Java 8 > > *Description* > I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. > After updating to flink-1.4.2 I'm getting the following error from Hadoop > complaining that the user doesn't have write permission to the given path: > {code:java} > WARN org.apache.hadoop.security.UserGroupInformation: > PriviledgedActionException as:xng (auth:SIMPLE) > cause:org.apache.hadoop.security.AccessControlException: Permission denied: > user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x > {code} > *NOTE*: > * If I run the same job on flink-1.4.0, Error disappears regardless of what > version of flink (1.4.0 or 1.4.2) dependencies I have for job > * Also if I run the job main method from my IDE and pass the same > parameters, I don't get above error. > *NOTE*: > It seems the problem somehow is in > {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that > with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the > cluster and run my job (flink topology) then the error doesn't appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176006260 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -77,18 +80,22 @@ /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ + @Nullable protected TypeSerializer serializer; + /** The type information describing the value type. Only used to lazily create the serializer +* and dropped during serialization */ + @Nullable --- End diff -- good catch, will fix that upon merging ---
[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization
[ https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407588#comment-16407588 ] ASF GitHub Bot commented on FLINK-9034: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5732#discussion_r176006260 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -77,18 +80,22 @@ /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the type is serialized or an ExecutionConfig is provided. */ + @Nullable protected TypeSerializer serializer; + /** The type information describing the value type. Only used to lazily create the serializer +* and dropped during serialization */ + @Nullable --- End diff -- good catch, will fix that upon merging > State Descriptors drop TypeInformation on serialization > --- > > Key: FLINK-9034 > URL: https://issues.apache.org/jira/browse/FLINK-9034 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.6.0 > > > The following code currently causes problems > {code} > public class MyFunction extends RichMapFunction { > private final ValueStateDescriptor descr = new > ValueStateDescriptor<>("state name", MyType.class); > private ValueState state; > @Override > public void open() { > state = getRuntimeContext().getValueState(descr); > } > } > {code} > The problem is that the state descriptor drops the type information and > creates a serializer before serialization as part of shipping the function in > the cluster. To do that, it initializes the serializer with an empty > execution config, making serialization inconsistent. > This is mainly an artifact from the days when dropping the type information > before shipping was necessary, because the type info was not serializable. It > now is, and we can fix that bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9037) Test flake Kafka09ITCase#testCancelingEmptyTopic
Stephan Ewen created FLINK-9037: --- Summary: Test flake Kafka09ITCase#testCancelingEmptyTopic Key: FLINK-9037 URL: https://issues.apache.org/jira/browse/FLINK-9037 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.5.0 Reporter: Stephan Ewen {code} Test testCancelingEmptyTopic(org.apache.flink.streaming.connectors.kafka.Kafka09ITCase) failed with: org.junit.runners.model.TestTimedOutException: test timed out after 6 milliseconds {code} Full log: https://api.travis-ci.org/v3/job/356044885/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407601#comment-16407601 ] Till Rohrmann commented on FLINK-9026: -- I think it is often better to close/unregister resources in the scope where they have been opened/registered. This makes resource management much easier. But then we should at least close the {{TaskManagerMetricGroup}} when the {{TaskExecutor}} is shut down. > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8979) Extend Kafka end-to-end tests to run with different versions
[ https://issues.apache.org/jira/browse/FLINK-8979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-8979: -- Assignee: Tzu-Li (Gordon) Tai > Extend Kafka end-to-end tests to run with different versions > > > Key: FLINK-8979 > URL: https://issues.apache.org/jira/browse/FLINK-8979 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > The current {{Kafka}} end-to-end test only runs with Kafka 0.10. We should > extend the test to also run with > * Kafka 0.8 > * Kafka 0.9 > * Kafka 0.11 > Additionally we should change the test job to not be embarrassingly parallel > by introducing a shuffle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8519) FileAlreadyExistsException on Start Flink Session
[ https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-8519. -- Resolution: Invalid Thanks [~yew1eb] for the clarification. Let me close this jira ticket then. > FileAlreadyExistsException on Start Flink Session > -- > > Key: FLINK-8519 > URL: https://issues.apache.org/jira/browse/FLINK-8519 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Hai Zhou >Priority: Blocker > Fix For: 1.5.0 > > > *steps to reproduce:* > 1. build flink from source , git commit: c1734f4 > 2. run script: > source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher; > export YARN_CONF_DIR=/path/hadoop/etc/hadoop; > export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop; > export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; > /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D > yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java > %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm > job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d > > *error infos:* > 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - > Error while running the Flink Yarn session. > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > ... 2 more > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user > already exists as dir; cannot create link here > at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244) > at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:485) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384) > ... 7 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5733 [FLINK-8975] [test] Add resume from savepoint end-to-end test ## What is the purpose of the change This pull request adds an end-to-end test that verifies resuming a job from a savepoint. The complete end-to-end test consists of the following: 1. The `StateMachineExample` is used for the end-to-end test 2. A separate job to generate Kafka events for the state machine is run 3. After the state machine job runs for a while, we take a savepoint. 4. Cancelling and resuming the state machine example job with the savepoint. All the above steps should not result in any errors or outputs from the state machine job. If so, the end-to-end test would fail. ## Brief change log - Add a separate main class for the Kafka events generator job - Add `test_resume_savepoint.sh` test script ## Verifying this change This PR itself introduces a new test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5733.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5733 commit 529e060cb05fd723b8656dcc9ef48f8011282dd8 Author: Tzu-Li (Gordon) Tai Date: 2018-03-21T08:25:37Z [FLINK-8975] [test] Add Kafka events generator job for StateMachineExample commit 213638b4194cceccd597e90c78631a6c6a191abb Author: Tzu-Li (Gordon) Tai Date: 2018-03-21T08:32:51Z [FLINK-8975] [test] Add resume from savepoint end-to-end test ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407616#comment-16407616 ] ASF GitHub Bot commented on FLINK-8975: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5733 [FLINK-8975] [test] Add resume from savepoint end-to-end test ## What is the purpose of the change This pull request adds an end-to-end test that verifies resuming a job from a savepoint. The complete end-to-end test consists of the following: 1. The `StateMachineExample` is used for the end-to-end test 2. A separate job to generate Kafka events for the state machine is run 3. After the state machine job runs for a while, we take a savepoint. 4. Cancelling and resuming the state machine example job with the savepoint. All the above steps should not result in any errors or outputs from the state machine job. If so, the end-to-end test would fail. ## Brief change log - Add a separate main class for the Kafka events generator job - Add `test_resume_savepoint.sh` test script ## Verifying this change This PR itself introduces a new test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8975 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5733.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5733 commit 529e060cb05fd723b8656dcc9ef48f8011282dd8 Author: Tzu-Li (Gordon) Tai Date: 2018-03-21T08:25:37Z [FLINK-8975] [test] Add Kafka events generator job for StateMachineExample commit 213638b4194cceccd597e90c78631a6c6a191abb Author: Tzu-Li (Gordon) Tai Date: 2018-03-21T08:32:51Z [FLINK-8975] [test] Add resume from savepoint end-to-end test > End-to-end test: Resume from savepoint > -- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > User usually take a savepoint and want to resume from it. In order to verify > that Flink supports this feature, we should add an end-to-end test which > scripts this behavior. We should use the general purpose testing job > FLINK-8971 with failures disabled for that. > The end-to-end test should do the following: > * Submit FLINK-8971 job > * Verify that the savepoint is there > * Cancel job and resume from savepoint > * Verify that job could be resumed > * Use different StateBackends: RocksDB incremental async/sync, RocksDB full > async/sync, FsStateBackend aysnc/sync -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407618#comment-16407618 ] Nico Kruber commented on FLINK-8899: It should, however, also not be in "minor" priority as this may affect user experience - as do all the other mentioned exceptions (which should get JIRA tickets). Every exception in the log will potentially make the users (and us) investigate it and burn a lot of time. > Submitting YARN job with FLIP-6 may lead to > ApplicationAttemptNotFoundException > --- > > Key: FLINK-8899 > URL: https://issues.apache.org/jira/browse/FLINK-8899 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Priority: Minor > Labels: flip-6 > > Occasionally, running a simple word count as this > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > leads to an {{ApplicationAttemptNotFoundException}} in the logs: > {code} > 2018-03-08 16:18:08,507 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to > FINISHED. > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:18:08,536 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED. > 2018-03-08 16:18:08,611 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(df707a3c9817ddf5936efe56d427e2bd). > 2018-03-08 16:18:08,634 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down.. > 2018-03-08 16:18:08,634 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0 > for job df707a3c9817ddf5936efe56d427e2bd from the resource manager. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:18:09,650 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - Replacing old instance of worker for ResourceID > container_1519984124671_0090_01_05 > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - > Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager. > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - The target with resource ID > container_1519984124671_0090_01_05 is already been monitored. > 2018-03-08 16:18:09,992 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager. > 2018-03-08 16:18:10,000 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:18:10,028 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception > on heartbeat > org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: > Application attempt appattempt_1519984124671_0090_01 doesn't exist in > ApplicationMasterService cache. > at > org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403) > at > org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176011796 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- An alternative to this: Query "read-records" metrics via the REST API, and only proceed after the job has processed a said number of records. This however requires adding an extra dependency, such as jq, for the response parsing. ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407621#comment-16407621 ] ASF GitHub Bot commented on FLINK-8975: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176011796 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- An alternative to this: Query "read-records" metrics via the REST API, and only proceed after the job has processed a said number of records. This however requires adding an extra dependency, such as jq, for the response parsing. > End-to-end test: Resume from savepoint > -- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > User usual
[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8899: - Priority: Major (was: Minor) > Submitting YARN job with FLIP-6 may lead to > ApplicationAttemptNotFoundException > --- > > Key: FLINK-8899 > URL: https://issues.apache.org/jira/browse/FLINK-8899 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Priority: Major > Labels: flip-6 > > Occasionally, running a simple word count as this > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > leads to an {{ApplicationAttemptNotFoundException}} in the logs: > {code} > 2018-03-08 16:18:08,507 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to > FINISHED. > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd > 2018-03-08 16:18:08,508 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:18:08,536 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED. > 2018-03-08 16:18:08,611 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(df707a3c9817ddf5936efe56d427e2bd). > 2018-03-08 16:18:08,634 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down.. > 2018-03-08 16:18:08,634 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0 > for job df707a3c9817ddf5936efe56d427e2bd from the resource manager. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:18:08,664 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:18:09,650 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - Replacing old instance of worker for ResourceID > container_1519984124671_0090_01_05 > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - > Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager. > 2018-03-08 16:18:09,654 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager. > 2018-03-08 16:18:09,654 INFO org.apache.flink.yarn.YarnResourceManager > - The target with resource ID > container_1519984124671_0090_01_05 is already been monitored. > 2018-03-08 16:18:09,992 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager. > 2018-03-08 16:18:10,000 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:18:10,028 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception > on heartbeat > org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: > Application attempt appattempt_1519984124671_0090_01 doesn't exist in > ApplicationMasterService cache. > at > org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403) > at > org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60) > at > org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) > at o
[jira] [Commented] (FLINK-8909) pyflink.sh not working with yarn
[ https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407633#comment-16407633 ] Hitesh Tiwari commented on FLINK-8909: -- Thanks, Configured `python.dc.tmp.dir` and it worked for us. > pyflink.sh not working with yarn > > > Key: FLINK-8909 > URL: https://issues.apache.org/jira/browse/FLINK-8909 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: Hitesh Tiwari >Priority: Blocker > > Hi, > i want to run the python application from pyflink.sh with yarn-cluster mode. > Added "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated pyflink.sh is > executing below coomand: > "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1 -v > "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@" > Running pyflink.sh: > ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py > While running getting below Error: > java.lang.Exception: The user defined 'open()' method caused an exception: An > error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > 03/09/2018 11:20:23 Job execution switched to status FAILING. > java.lang.Exception: The user defined 'open()' method caused an exception: > An error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:
[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9038: Description: https://travis-ci.org/apache/flink/jobs/355821305 {code} est testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) failed with: java.lang.AssertionError: Found a file /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log with a prohibited string (one of [Exception, Started SelectChannelConnector@0.0.0.0:8081]). Excerpts: [ 2018-03-20 15:44:20,533 ERROR org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception on heartbeat ] at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394) at org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78) at org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54) at org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) {code} was:https://travis-ci.org/apache/flink/jobs/355821305 > YARNSessionCapacitySchedulerITCase fails on travis > -- > > Key: FLINK-9038 > URL: https://issues.apache.org/jira/browse/FLINK-9038 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/355821305 > {cod
[jira] [Created] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis
Chesnay Schepler created FLINK-9038: --- Summary: YARNSessionCapacitySchedulerITCase fails on travis Key: FLINK-9038 URL: https://issues.apache.org/jira/browse/FLINK-9038 Project: Flink Issue Type: Bug Components: Tests, YARN Affects Versions: 1.5.0 Reporter: Chesnay Schepler Fix For: 1.5.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9038: Description: https://travis-ci.org/apache/flink/jobs/355821305 > YARNSessionCapacitySchedulerITCase fails on travis > -- > > Key: FLINK-9038 > URL: https://issues.apache.org/jira/browse/FLINK-9038 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/355821305 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5573 Changes look good @yanghua. Merging this PR. ---
[jira] [Closed] (FLINK-8909) pyflink.sh not working with yarn
[ https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hitesh Tiwari closed FLINK-8909. Resolution: Not A Problem > pyflink.sh not working with yarn > > > Key: FLINK-8909 > URL: https://issues.apache.org/jira/browse/FLINK-8909 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: Hitesh Tiwari >Priority: Blocker > > Hi, > i want to run the python application from pyflink.sh with yarn-cluster mode. > Added "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated pyflink.sh is > executing below coomand: > "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1 -v > "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@" > Running pyflink.sh: > ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py > While running getting below Error: > java.lang.Exception: The user defined 'open()' method caused an exception: An > error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > 03/09/2018 11:20:23 Job execution switched to status FAILING. > java.lang.Exception: The user defined 'open()' method caused an exception: > An error occurred while copying the file. > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: An error occurred while copying the > file. > at > org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114) > at > org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100) > at > org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) > ... 3 more > Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist > or the user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318) > at > org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.uti
[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407636#comment-16407636 ] ASF GitHub Bot commented on FLINK-8756: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5573 Changes look good @yanghua. Merging this PR. > Support ClusterClient.getAccumulators() in RestClusterClient > > > Key: FLINK-8756 > URL: https://issues.apache.org/jira/browse/FLINK-8756 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: vinoyang >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407641#comment-16407641 ] Till Rohrmann commented on FLINK-9038: -- I thin this one is a duplicate of FLINK-8899 or vice versa. > YARNSessionCapacitySchedulerITCase fails on travis > -- > > Key: FLINK-9038 > URL: https://issues.apache.org/jira/browse/FLINK-9038 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/355821305 > {code} > est > testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) > failed with: > java.lang.AssertionError: Found a file > /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log > with a prohibited string (one of [Exception, Started > SelectChannelConnector@0.0.0.0:8081]). Excerpts: > [ > 2018-03-20 15:44:20,533 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception > on heartbeat > ] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394) > at > org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.Forke
[jira] [Closed] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis
[ https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-9038. Resolution: Duplicate > YARNSessionCapacitySchedulerITCase fails on travis > -- > > Key: FLINK-9038 > URL: https://issues.apache.org/jira/browse/FLINK-9038 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > > https://travis-ci.org/apache/flink/jobs/355821305 > {code} > est > testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) > failed with: > java.lang.AssertionError: Found a file > /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log > with a prohibited string (one of [Exception, Started > SelectChannelConnector@0.0.0.0:8081]). Excerpts: > [ > 2018-03-20 15:44:20,533 ERROR > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - Exception > on heartbeat > ] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394) > at > org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v7
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176019938 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- when do the jobs shut down if no error occurs? ---
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176020327 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- you could use the log4j reporter and grep the logs. ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407670#comment-16407670 ] ASF GitHub Bot commented on FLINK-8975: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176020327 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- you could use the log4j reporter and grep the logs. > End-to-end test: Resume from savepoint > -- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > User usually take a savepoint and want to resume from it. In order to verify > that Flink supports this feature, we should add an end-to-end test which > scripts this behavior. We should use the gen
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407669#comment-16407669 ] ASF GitHub Bot commented on FLINK-8975: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176019938 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- when do the jobs shut down i
[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union
[ https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407683#comment-16407683 ] Fabian Hueske commented on FLINK-9031: -- Adding relevant information from the mail thread: [~StephanEwen] suggested {quote} To diagnose that, can you please check the following: - Change the Person data type to be immutable (final fields, no setters, set fields in constructor instead). Does that make the problem go away? - Change the Person data type to not be a POJO by adding a dummy fields that is never used, but does not have a getter/setter. Does that make the problem go away? If either of that is the case, it must be a mutability bug somewhere in either accidental object reuse or accidental serializer sharing. {quote} Making the Person object immutable solved the problem. > DataSet Job result changes when adding rebalance after union > > > Key: FLINK-9031 > URL: https://issues.apache.org/jira/browse/FLINK-9031 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime, Optimizer >Affects Versions: 1.3.1 >Reporter: Fabian Hueske >Priority: Critical > Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt > > > A user [reported this issue on the user mailing > list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E]. > {quote}I am using Flink 1.3.1 and I have found a strange behavior on running > the following logic: > # Read data from file and store into DataSet > # Split dataset in two, by checking if "field1" of POJOs is empty or not, so > that the first dataset contains only elements with non empty "field1", and > the second dataset will contain the other elements. > # Each dataset is then grouped by, one by "field1" and other by another > field, and subsequently reduced. > # The 2 datasets are merged together by union. > # The final dataset is written as json. > What I was expected, from output, was to find only one element with a > specific value of "field1" because: > # Reducing the first dataset grouped by "field1" should generate only one > element with a specific value of "field1". > # The second dataset should contain only elements with empty "field1". > # Making an union of them should not duplicate any record. > This does not happen. When i read the generated jsons i see some duplicate > (non empty) values of "field1". > Strangely this does not happen when the union between the two datasets is > not computed. In this case the first dataset produces elements only with > distinct values of "field1", while second dataset produces only records with > empty field "value1". > {quote} > The user has not enable object reuse. > Later he reports that the problem disappears when he injects a rebalance() > after a union resolves the problem. I had a look at the execution plans for > both cases (attached to this issue) but could not identify a problem. > Hence I assume, this might be an issue with the runtime code but we need to > look deeper into this. The user also provided an example program consisting > of two classes which are attached to the issue as well. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5734 [FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the TaskExecutor is shut down ## What is the purpose of the change We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown. ## Brief change log - close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK_9026 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5734.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5734 commit 94ecbc87e53b4fff306a864971f164c765122194 Author: sihuazhou Date: 2018-03-21T09:46:31Z close the TaskManagerMetricGroup when the TaskExecutor is shut down ---
[GitHub] flink issue #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricGroup wh...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5734 CC: @tillrohrmann ---
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407686#comment-16407686 ] ASF GitHub Bot commented on FLINK-9026: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5734 [FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the TaskExecutor is shut down ## What is the purpose of the change We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown. ## Brief change log - close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK_9026 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5734.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5734 commit 94ecbc87e53b4fff306a864971f164c765122194 Author: sihuazhou Date: 2018-03-21T09:46:31Z close the TaskManagerMetricGroup when the TaskExecutor is shut down > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407688#comment-16407688 ] ASF GitHub Bot commented on FLINK-9026: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5734 CC: @tillrohrmann > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176027456 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- when the script exits, a cleanup hook shuts down the cluster. It also parses the logs for any unexpected errors; if there is one, the test fails. ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407691#comment-16407691 ] ASF GitHub Bot commented on FLINK-8975: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176027456 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- when the script exits, a c
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176027636 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- That's a good idea! I'll give this approach a try. ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407692#comment-16407692 ] ASF GitHub Bot commented on FLINK-8975: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176027636 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 --- End diff -- That's a good idea! I'll give this approach a try. > End-to-end test: Resume from savepoint > -- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > User usually take a savepoint and want to resume from it. In order to verify > that Flink supports this feature, we should add an end-to-end test which > scripts this behavior. We should use the ge
[jira] [Created] (FLINK-9039) Broken link to Hadoop Setup Guide in docs
Florian Schmidt created FLINK-9039: -- Summary: Broken link to Hadoop Setup Guide in docs Key: FLINK-9039 URL: https://issues.apache.org/jira/browse/FLINK-9039 Project: Flink Issue Type: Bug Affects Versions: 1.4.2 Reporter: Florian Schmidt On [https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/dependencies.html] under the section Hadoop Dependencies there is a link to "Hadoop Setup Guide" which links to [https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/hadoop.html,] which in turn does not exist -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176028191 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -292,6 +292,13 @@ public void start() throws Exception { throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } + try { + // it will call close() recursively from the parent to children + taskManagerMetricGroup.close(); --- End diff -- this method never throws exceptions ---
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407696#comment-16407696 ] ASF GitHub Bot commented on FLINK-9026: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176028213 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -265,7 +265,11 @@ class TaskManager( case t: Exception => log.error("FileCache did not shutdown properly.", t) } -taskManagerMetricGroup.close() +try { + taskManagerMetricGroup.close() --- End diff -- same as above > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176028213 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -265,7 +265,11 @@ class TaskManager( case t: Exception => log.error("FileCache did not shutdown properly.", t) } -taskManagerMetricGroup.close() +try { + taskManagerMetricGroup.close() --- End diff -- same as above ---
[GitHub] flink pull request #5716: [FLINK-9022][state] fix resource release in Stream...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5716 ---
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407695#comment-16407695 ] ASF GitHub Bot commented on FLINK-9026: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176028191 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -292,6 +292,13 @@ public void start() throws Exception { throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } + try { + // it will call close() recursively from the parent to children + taskManagerMetricGroup.close(); --- End diff -- this method never throws exceptions > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5726: [FLINK-9028][flip6] perform parameters checking be...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5726 ---
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176028721 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- The only trap is see is `kafka_cleanup`, which shuts down ZK and kafka. What about the state machine though? ---
[jira] [Resolved] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
[ https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9022. -- Resolution: Fixed Fixed via master: f9df13c5058f194a5c686b9b753345d9226fc87a 1.5.0: 27189d8058d6c3bc00dbc8409f40bedbccf01ac5 > fix resource close in > `StreamTaskStateInitializerImpl.streamOperatorStateContext()` > --- > > Key: FLINK-9022 > URL: https://issues.apache.org/jira/browse/FLINK-9022 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > We have the following code in > {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is > incorrect: > {code} > } catch (Exception ex) { > // cleanup if something went wrong before results got published. > if > (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) { > IOUtils.closeQuietly(keyedStatedBackend); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) { > IOUtils.closeQuietly(keyedStatedBackend); // this should close > operatorStateBackend > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) { > IOUtils.closeQuietly(rawKeyedStateInputs); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) { > IOUtils.closeQuietly(rawOperatorStateInputs); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) { > IOUtils.closeQuietly(rawOperatorStateInputs); > } > throw new Exception("Exception while creating > StreamOperatorStateContext.", ex); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9028) flip6 should check config before starting cluster
[ https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407698#comment-16407698 ] ASF GitHub Bot commented on FLINK-9028: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5726 > flip6 should check config before starting cluster > - > > Key: FLINK-9028 > URL: https://issues.apache.org/jira/browse/FLINK-9028 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > In flip6, we should perform parameters checking before starting cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
[ https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407697#comment-16407697 ] ASF GitHub Bot commented on FLINK-9022: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5716 > fix resource close in > `StreamTaskStateInitializerImpl.streamOperatorStateContext()` > --- > > Key: FLINK-9022 > URL: https://issues.apache.org/jira/browse/FLINK-9022 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > We have the following code in > {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is > incorrect: > {code} > } catch (Exception ex) { > // cleanup if something went wrong before results got published. > if > (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) { > IOUtils.closeQuietly(keyedStatedBackend); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) { > IOUtils.closeQuietly(keyedStatedBackend); // this should close > operatorStateBackend > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) { > IOUtils.closeQuietly(rawKeyedStateInputs); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) { > IOUtils.closeQuietly(rawOperatorStateInputs); > } > if > (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) { > IOUtils.closeQuietly(rawOperatorStateInputs); > } > throw new Exception("Exception while creating > StreamOperatorStateContext.", ex); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8980) End-to-end test: BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Florian Schmidt reassigned FLINK-8980: -- Assignee: Florian Schmidt > End-to-end test: BucketingSink > -- > > Key: FLINK-8980 > URL: https://issues.apache.org/jira/browse/FLINK-8980 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Florian Schmidt >Priority: Blocker > Fix For: 1.5.0 > > > In order to verify the {{BucketingSink}}, we should add an end-to-end test > which verifies that the {{BucketingSink}} does not lose data under failures. > An idea would be to have a CountUp job which simply counts up a counter which > is persisted. The emitted values will be written to disk by the > {{BucketingSink}}. Now we should kill randomly Flink processes (cluster > entrypoint and TaskExecutors) to simulate failures. Even after these > failures, the written files should contain the correct sequence of numbers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407700#comment-16407700 ] ASF GitHub Bot commented on FLINK-8975: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176028721 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup +} +trap kafka_cleanup INT +trap kafka_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 200 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +# wait a bit to have some events pass through the state machine +sleep 15 + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# resume state machine job with savepoint +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +sleep 15 + +# if state is errorneous and the state machine job produces alerting state transitions, --- End diff -- The only trap is see is `kaf
[jira] [Resolved] (FLINK-9028) flip6 should check config before starting cluster
[ https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9028. -- Resolution: Fixed Fixed via master: 38aa863d5a710b283b5c9b2eb9225d6fb9cc0c70 7c952dd3a75bc64d10bf9be12e405bbc349422b1 1.5.0: fecc19088b36fc4c8bca5ff39ba756f8fd71 c6f91334b67d589f0c17ed75c9dbcbaedaf8ba51 > flip6 should check config before starting cluster > - > > Key: FLINK-9028 > URL: https://issues.apache.org/jira/browse/FLINK-9028 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > In flip6, we should perform parameters checking before starting cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176029020 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -292,6 +292,13 @@ public void start() throws Exception { throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } + try { + // it will call close() recursively from the parent to children + taskManagerMetricGroup.close(); --- End diff -- I was intended to catch maybe some `RuntimeException`... I will just remove the `try catch`. ---
[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it
[ https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407703#comment-16407703 ] ASF GitHub Bot commented on FLINK-9026: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5734#discussion_r176029020 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -292,6 +292,13 @@ public void start() throws Exception { throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } + try { + // it will call close() recursively from the parent to children + taskManagerMetricGroup.close(); --- End diff -- I was intended to catch maybe some `RuntimeException`... I will just remove the `try catch`. > Unregister finished tasks from TaskManagerMetricGroup and close it > -- > > Key: FLINK-9026 > URL: https://issues.apache.org/jira/browse/FLINK-9026 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they > have reached a final state. Moreover, we should close the > {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or > let the caller do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176029439 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup --- End diff -- @zentol the `kafka_cleanup` trap also includes this, which shuts down the Flink cluster and checks logs for errors. ---
[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint
[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407705#comment-16407705 ] ASF GitHub Bot commented on FLINK-8975: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176029439 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,102 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +start_cluster + +# this tests runs 2 streaming jobs; adding extra taskmanagers for more slots +add_taskmanagers 1 + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz"; + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end + +function kafka_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # make sure to run regular cleanup as well + cleanup --- End diff -- @zentol the `kafka_cleanup` trap also includes this, which shuts down the Flink cluster and checks logs for errors. > End-to-end test: Resume from savepoint > -- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > User usually take a savepoint and want to resume from it. In order to verify > that Flink supports this feature, we should add an end-to-end test which > scripts this behavior. We should use the general purpose testing job > FLINK-8971 with failures disabled for that. > The end-to-end test should do the following: > * Submit FLINK-8971 job > * Verify that the savepoint is there > * Cancel job and resume from savepoint > * Verify that job could be resumed > * Use different StateBackends: RocksDB incremental async/sync, RocksDB full > async/sync, FsStateBackend aysnc/sync -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5717: [FLINK-9020][E2E Tests] Use separate modules per t...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5717#discussion_r176032646 --- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml --- @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + + 4.0.0 + + parent-child-classloading-test --- End diff -- * add `flink` prefix * add `_${scala.binary.version}` suffix * add `flink-parent-child-classloading-test` (this is what allows us to omit the scala stuff when listing child modules) ---
[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules
[ https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407724#comment-16407724 ] ASF GitHub Bot commented on FLINK-9020: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5717#discussion_r176032646 --- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml --- @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + + flink-end-to-end-tests + org.apache.flink + 1.6-SNAPSHOT + .. + + + 4.0.0 + + parent-child-classloading-test --- End diff -- * add `flink` prefix * add `_${scala.binary.version}` suffix * add `flink-parent-child-classloading-test` (this is what allows us to omit the scala stuff when listing child modules) > Move test projects of end-to-end tests in separate modules > -- > > Key: FLINK-9020 > URL: https://issues.apache.org/jira/browse/FLINK-9020 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > > I would like to propose to move each test case in the end-to-end tests into > it's own module. Reason is that currently we are building all jars for the > tests from one pom.xml, which makes it hard to have specific tests for > certain build types (e.g. examples derived from the flink quickstart > archetype). > For the current state this would mean > - change packaging from flink-end-to-end-tests from jar to pom > - refactor the classloader example to be in its own module -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9010) NoResourceAvailableException with FLIP-6
[ https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407726#comment-16407726 ] Till Rohrmann commented on FLINK-9010: -- Does your Yarn cluster has enough resources to run this program? If your program consists of 2 operators and you run it with DOP 400, then it should require 800 slots (logical). If the two operators are in the same slot sharing group, then two logical slots will be deployed to the same {{TaskExecutor}} slot. Thus, I'm not sure whether this is an actual problem here. Please verify and if this is the case, then let's close this issue. > NoResourceAvailableException with FLIP-6 > - > > Key: FLINK-9010 > URL: https://issues.apache.org/jira/browse/FLINK-9010 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) > with FLIP-6 mode and a checkpointing interval of 1000 and got the following > exception: > {code} > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000101 - Remaining pending container > requests: 302 > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000101 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,154 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,155 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml > 2018-03-16 03:41:20,165 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,168 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /taskmanager.out 2> /taskmanager.err > 2018-03-16 03:41:20,176 INFO > org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - > Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager >
[GitHub] flink issue #5717: [FLINK-9020][E2E Tests] Use separate modules per testcase
Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5717 Thanks @zentol! I addressed your comments and will squash if approved ---
[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules
[ https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407732#comment-16407732 ] ASF GitHub Bot commented on FLINK-9020: --- Github user florianschmidt1994 commented on the issue: https://github.com/apache/flink/pull/5717 Thanks @zentol! I addressed your comments and will squash if approved > Move test projects of end-to-end tests in separate modules > -- > > Key: FLINK-9020 > URL: https://issues.apache.org/jira/browse/FLINK-9020 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > > I would like to propose to move each test case in the end-to-end tests into > it's own module. Reason is that currently we are building all jars for the > tests from one pom.xml, which makes it hard to have specific tests for > certain build types (e.g. examples derived from the flink quickstart > archetype). > For the current state this would mean > - change packaging from flink-end-to-end-tests from jar to pom > - refactor the classloader example to be in its own module -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument
Chesnay Schepler created FLINK-9040: --- Summary: JobVertex#setMaxParallelism does not valid argument Key: FLINK-9040 URL: https://issues.apache.org/jira/browse/FLINK-9040 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.5.0 Reporter: Chesnay Schepler {code} /** * Sets the maximum parallelism for the task. * * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE. */ public void setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5729: [FLINK-7343][kafka-tests] Fix test at-least-once test ins...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5729 Sweet, nice to see this fixed. Code looks good, +1 to merge! ---
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407737#comment-16407737 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5729 Sweet, nice to see this fixed. Code looks good, +1 to merge! > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5725 Thank your trying to help improve Flink. I think, however, this patch is not necessary. The method does not require synchronization, it should be correct as it is. ---
[jira] [Assigned] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument
[ https://issues.apache.org/jira/browse/FLINK-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9040: - Assignee: Sihua Zhou > JobVertex#setMaxParallelism does not valid argument > --- > > Key: FLINK-9040 > URL: https://issues.apache.org/jira/browse/FLINK-9040 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Sihua Zhou >Priority: Minor > > {code} > /** > * Sets the maximum parallelism for the task. > * > * @param maxParallelism The maximum parallelism to be set. must be between 1 > and Short.MAX_VALUE. > */ > public void setMaxParallelism(int maxParallelism) { > this.maxParallelism = maxParallelism; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
[ https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407739#comment-16407739 ] ASF GitHub Bot commented on FLINK-8394: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5725 Thank your trying to help improve Flink. I think, however, this patch is not necessary. The method does not require synchronization, it should be correct as it is. > Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown > --- > > Key: FLINK-8394 > URL: https://issues.apache.org/jira/browse/FLINK-8394 > Project: Flink > Issue Type: Test > Components: Streaming >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public void shutdown() { > running = false; > interrupt(); > expectedRecord.complete(0L); > {code} > Access to expectedRecord should be protected by synchronization, as done on > other methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5725: [FLINK-8394] Lack of synchronization accessing exp...
Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5725 ---
[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5725 @StephanEwen OK, let me close this PR, thanks. ---
[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
[ https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407742#comment-16407742 ] ASF GitHub Bot commented on FLINK-8394: --- Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5725 > Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown > --- > > Key: FLINK-8394 > URL: https://issues.apache.org/jira/browse/FLINK-8394 > Project: Flink > Issue Type: Test > Components: Streaming >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public void shutdown() { > running = false; > interrupt(); > expectedRecord.complete(0L); > {code} > Access to expectedRecord should be protected by synchronization, as done on > other methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
[ https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407741#comment-16407741 ] ASF GitHub Bot commented on FLINK-8394: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5725 @StephanEwen OK, let me close this PR, thanks. > Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown > --- > > Key: FLINK-8394 > URL: https://issues.apache.org/jira/browse/FLINK-8394 > Project: Flink > Issue Type: Test > Components: Streaming >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public void shutdown() { > running = false; > interrupt(); > expectedRecord.complete(0L); > {code} > Access to expectedRecord should be protected by synchronization, as done on > other methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka
Chesnay Schepler created FLINK-9041: --- Summary: Refactor StreamTaskTest to not use scala and akka Key: FLINK-9041 URL: https://issues.apache.org/jira/browse/FLINK-9041 Project: Flink Issue Type: Improvement Components: Streaming, Tests Affects Versions: 1.5.0 Reporter: Chesnay Schepler -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
[ https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang closed FLINK-8394. --- Resolution: Won't Fix > Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown > --- > > Key: FLINK-8394 > URL: https://issues.apache.org/jira/browse/FLINK-8394 > Project: Flink > Issue Type: Test > Components: Streaming >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > public void shutdown() { > running = false; > interrupt(); > expectedRecord.complete(0L); > {code} > Access to expectedRecord should be protected by synchronization, as done on > other methods. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask
[ https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-7897: --- Assignee: vinoyang > Consider using nio.Files for file deletion in TransientBlobCleanupTask > -- > > Key: FLINK-7897 > URL: https://issues.apache.org/jira/browse/FLINK-7897 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > nio.Files#delete() provides better clue as to why the deletion may fail: > https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path) > Depending on the potential exception (FileNotFound), the call to > localFile.exists() may be skipped. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5718 Having a custom timeout like @zentol suggests could be helpful in the long run. For now, though, I personally agree that the timeouts are counter productive (I actually never understood what they are supposed to solve). I prefer a deadlocking test to stay around, so I can jstack to the process, attach a debugger, pull a heap dump, whatever. The only environment where we cannot do that is the CI server, which has a timeout and thread dump already... ---
[jira] [Created] (FLINK-9042) Port ResumeCheckpointManuallyITCase to flip6
Chesnay Schepler created FLINK-9042: --- Summary: Port ResumeCheckpointManuallyITCase to flip6 Key: FLINK-9042 URL: https://issues.apache.org/jira/browse/FLINK-9042 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing, Tests Affects Versions: 1.5.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.5.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
[ https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407745#comment-16407745 ] ASF GitHub Bot commented on FLINK-8073: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5718 Having a custom timeout like @zentol suggests could be helpful in the long run. For now, though, I personally agree that the timeouts are counter productive (I actually never understood what they are supposed to solve). I prefer a deadlocking test to stay around, so I can jstack to the process, attach a debugger, pull a heap dump, whatever. The only environment where we cannot do that is the CI server, which has a timeout and thread dump already... > Test instability > FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint() > - > > Key: FLINK-8073 > URL: https://issues.apache.org/jira/browse/FLINK-8073 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5712: [FLINK-9011] YarnResourceManager spamming log file at INF...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5712 cc @tillrohrmann could you have a look at this? ---
[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level
[ https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407749#comment-16407749 ] ASF GitHub Bot commented on FLINK-9011: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5712 cc @tillrohrmann could you have a look at this? > YarnResourceManager spamming log file at INFO level > --- > > Key: FLINK-9011 > URL: https://issues.apache.org/jira/browse/FLINK-9011 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For every requested resource, the {{YarnResourceManager}} spams the log with > log-level INFO and the following messages: > {code} > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,181 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > 2018-03-16 03:41:20,190 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /taskmanager.out 2> /taskmanager.err > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9041) Refactor StreamTaskTest to not use scala and akka
[ https://issues.apache.org/jira/browse/FLINK-9041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reassigned FLINK-9041: - Assignee: Sihua Zhou > Refactor StreamTaskTest to not use scala and akka > - > > Key: FLINK-9041 > URL: https://issues.apache.org/jira/browse/FLINK-9041 > Project: Flink > Issue Type: Improvement > Components: Streaming, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Sihua Zhou >Priority: Trivial > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover
[ https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407756#comment-16407756 ] vinoyang commented on FLINK-8946: - [~till.rohrmann] I just tracked this issue from source code, For Flip-6, specific code in *TaskExecutor*, the *TaskManagerMetricGroup's* instance be injected with the constructor parameter, and the metricGroup's instance is created at *TaskManagerRunner.startTaskManager*. In *TaskExecutor*, the instance is not be closed, so it seems like it's not a problem in Flip-6. > TaskManager stop sending metrics after JobManager failover > -- > > Key: FLINK-8946 > URL: https://issues.apache.org/jira/browse/FLINK-8946 > Project: Flink > Issue Type: Bug > Components: Metrics, TaskManager >Affects Versions: 1.4.2 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Critical > Fix For: 1.5.0 > > > Running in Yarn-standalone mode, when the Job Manager performs a failover, > all TaskManager that are inherited from the previous JobManager will not send > metrics to the new JobManager and other registered metric reporters. > > A cursory glance reveal that these line of code might be the cause > [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086] > Perhap the TaskManager close its metrics group when disassociating > JobManager, but not creating a new one on fail-over association ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5705: [FLINK-8968][state]Fix native resource leak caused by Rea...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5705 Hi @tillrohrmann could you please have a look at this? This PR covers multi bugs that should be addressed...I think maybe stefan is too busy with the testing works on 1.5 currently... ---
[jira] [Commented] (FLINK-8968) Fix native resource leak caused by ReadOptions
[ https://issues.apache.org/jira/browse/FLINK-8968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407760#comment-16407760 ] ASF GitHub Bot commented on FLINK-8968: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/5705 Hi @tillrohrmann could you please have a look at this? This PR covers multi bugs that should be addressed...I think maybe stefan is too busy with the testing works on 1.5 currently... > Fix native resource leak caused by ReadOptions > --- > > Key: FLINK-8968 > URL: https://issues.apache.org/jira/browse/FLINK-8968 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.5.0 > > > We should pull the creation of ReadOptions out of the loop in > {{RocksDBFullSnapshotOperation.writeKVStateMetaData()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...
Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/5712#discussion_r176047433 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -351,16 +351,16 @@ static ContainerLaunchContext createTaskExecutorContext( require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - log.info("TM:remote keytab path obtained {}", remoteKeytabPath); - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + if (log.isDebugEnabled()) { --- End diff -- Sorry, I forgot to tell you that the `log.debug()` method internally calls `islogEnabled()`. You can see the `org.slf4j.Logger`'s log4j adapter class: ``` //org.slf4j.impl.Log4jLoggerAdapter.java ... public void debug(String format, Object arg) { if (logger.isDebugEnabled()) { FormattingTuple ft = MessageFormatter.format(format, arg); logger.log(FQCN, Level.DEBUG, ft.getMessage(), ft.getThrowable()); } } ... ``` ---
[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level
[ https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407763#comment-16407763 ] ASF GitHub Bot commented on FLINK-9011: --- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/flink/pull/5712#discussion_r176047433 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -351,16 +351,16 @@ static ContainerLaunchContext createTaskExecutorContext( require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - log.info("TM:remote keytab path obtained {}", remoteKeytabPath); - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + if (log.isDebugEnabled()) { --- End diff -- Sorry, I forgot to tell you that the `log.debug()` method internally calls `islogEnabled()`. You can see the `org.slf4j.Logger`'s log4j adapter class: ``` //org.slf4j.impl.Log4jLoggerAdapter.java ... public void debug(String format, Object arg) { if (logger.isDebugEnabled()) { FormattingTuple ft = MessageFormatter.format(format, arg); logger.log(FQCN, Level.DEBUG, ft.getMessage(), ft.getThrowable()); } } ... ``` > YarnResourceManager spamming log file at INFO level > --- > > Key: FLINK-9011 > URL: https://issues.apache.org/jira/browse/FLINK-9011 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For every requested resource, the {{YarnResourceManager}} spams the log with > log-level INFO and the following messages: > {code} > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,181 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > 2018-03-16 03:41:20,190 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.proper
[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level
[ https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407770#comment-16407770 ] ASF GitHub Bot commented on FLINK-9011: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5712#discussion_r176048790 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -351,16 +351,16 @@ static ContainerLaunchContext createTaskExecutorContext( require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - log.info("TM:remote keytab path obtained {}", remoteKeytabPath); - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + if (log.isDebugEnabled()) { --- End diff -- I know as I said in the previous comment `and each of them would do the same judgement inside the debug method.`. Here I do a judgement to avoid that if the log level upper then DEBUG, it could just jump these code. > YarnResourceManager spamming log file at INFO level > --- > > Key: FLINK-9011 > URL: https://issues.apache.org/jira/browse/FLINK-9011 > Project: Flink > Issue Type: Bug > Components: ResourceManager, YARN >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For every requested resource, the {{YarnResourceManager}} spams the log with > log-level INFO and the following messages: > {code} > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - Received new container: > container_1521038088305_0257_01_000102 - Remaining pending container > requests: 301 > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TaskExecutor container_1521038088305_0257_01_000102 will be > started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory > limit 3072 MB > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote keytab principal obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote yarn conf path obtained null > 2018-03-16 03:41:20,180 INFO org.apache.flink.yarn.YarnResourceManager > - TM:remote krb5 path obtained null > 2018-03-16 03:41:20,181 INFO org.apache.flink.yarn.Utils > - Copying from > file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > to > hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml > 2018-03-16 03:41:20,190 INFO org.apache.flink.yarn.YarnResourceManager > - Prepared local resource for modified yaml: resource { scheme: > "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: > "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml" > } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Creating container launch context for TaskManagers > 2018-03-16 03:41:20,194 INFO org.apache.flink.yarn.YarnResourceManager > - Starting TaskManagers with command: $JAVA_HOME/bin/java > -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m > -Dlog.file=/taskmanager.log > -Dlogback.configurationFile=file:./logback.xml > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /taskmanager.out 2> /taskmanager.err > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/5712#discussion_r176048790 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java --- @@ -351,16 +351,16 @@ static ContainerLaunchContext createTaskExecutorContext( require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); - log.info("TM:remote keytab path obtained {}", remoteKeytabPath); - final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); - log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath); - final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); - log.info("TM:remote krb5 path obtained {}", remoteKrb5Path); + + if (log.isDebugEnabled()) { --- End diff -- I know as I said in the previous comment `and each of them would do the same judgement inside the debug method.`. Here I do a judgement to avoid that if the log level upper then DEBUG, it could just jump these code. ---
[jira] [Reopened] (FLINK-8699) Fix concurrency problem in rocksdb full checkpoint
[ https://issues.apache.org/jira/browse/FLINK-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reopened FLINK-8699: --- Reopen because the problem still exists. > Fix concurrency problem in rocksdb full checkpoint > -- > > Key: FLINK-8699 > URL: https://issues.apache.org/jira/browse/FLINK-8699 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > Fix For: 1.5.0 > > > In full checkpoint, `kvStateInformation` is not a copied object and it can be > changed when writeKVStateMetaData() is invoking ... This can lead to > problematic, which is serious. > {code} > private void writeKVStateMetaData() throws IOException { > // ... > for (Map.Entry RegisteredKeyedBackendStateMetaInfo>> column : > stateBackend.kvStateInformation.entrySet()) { > } > //... > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9018) Unclosed snapshotCloseableRegistry in RocksDBKeyedStateBackend#FullSnapshotStrategy#performSnapshot
[ https://issues.apache.org/jira/browse/FLINK-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou resolved FLINK-9018. --- Resolution: Fixed > Unclosed snapshotCloseableRegistry in > RocksDBKeyedStateBackend#FullSnapshotStrategy#performSnapshot > --- > > Key: FLINK-9018 > URL: https://issues.apache.org/jira/browse/FLINK-9018 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: Sihua Zhou >Priority: Minor > > {code} > final CloseableRegistry snapshotCloseableRegistry = new > CloseableRegistry(); > if (kvStateInformation.isEmpty()) { > if (LOG.isDebugEnabled()) { > LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed > state at {}. Returning null.", > timestamp); > } > return DoneFuture.of(SnapshotResult.empty()); > } > {code} > If the method returns in the above if block, snapshotCloseableRegistry is not > closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5735: [FLINK-9036] [core] Add default values to State De...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5735 [FLINK-9036] [core] Add default values to State Descriptors via suppliers **This PR is based on #5732 and only the last two commits are relevant for this PR** ## What is the purpose of the change Earlier versions had a default value in `ValueState`. We dropped that, because the value would have to be duplicated on each access, to be safe against side effects when using mutable types. This pull request re-adds the feature, but using a supplier/factory function to create the default value on access. This is more efficient than copying a shared default value on access. ## Brief change log - The `StateDescriptor` produces default values through a optional `Supplier` function. - For backwards compatibility, the mode to pass directly a value is kept. The value is wrapped in a `DefaultValueFactory` which implements the legacy functionality using a serializer to copy the value on each access. ## Verifying this change - This change adds a set of unit tests - The change modifies one example program (`StateMachineExample`). Running that example shows how the change works end-to-end. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink state_default_values Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5735 commit 1c756f0d6dfe71114a97a1b9effaf321b9da063b Author: Stephan Ewen Date: 2018-03-20T14:29:12Z [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor commit 186008c609635f99e4123912a632a4e068d3c532 Author: Stephan Ewen Date: 2018-03-20T14:36:19Z [hotfix] [core] Demockitofy state descriptor tests commit 98666506c193feffb3952d9d424d3aa924f40318 Author: Stephan Ewen Date: 2018-03-20T14:44:27Z [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects. commit 1b286e4adbb5369df41c902bd161f5e854b862b8 Author: Stephan Ewen Date: 2018-03-20T15:22:12Z [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization. Throwing away TypeInformation upon serialization was previously done because the type information was not serializable. Now that it is serializable, we can (and should) keep it to provide consistent user experience, where all serializers respect the ExecutionConfig. commit 6064b3d49d75d40ea69a65f5e38724bf9119b526 Author: Stephan Ewen Date: 2018-03-20T15:46:13Z [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible commit a29b128f4f1bec49f1403aa21889e5890dc589ee Author: Stephan Ewen Date: 2018-03-20T16:16:06Z [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling commit f19a4721acae62f8ba578c7cb235b6a917f3a258 Author: Stephan Ewen Date: 2018-03-20T17:04:24Z [FLINK-9036] [core] Add default values to State Descriptors via suppliers commit 6d7757017f52f7c3fd7cbe99d05f1de63186d12d Author: Stephan Ewen Date: 2018-03-20T18:51:02Z [FLINK-9036] [examples] Use state default value in StateMachineExample ---
[jira] [Commented] (FLINK-9036) Add default value via suppliers
[ https://issues.apache.org/jira/browse/FLINK-9036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407830#comment-16407830 ] ASF GitHub Bot commented on FLINK-9036: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5735 [FLINK-9036] [core] Add default values to State Descriptors via suppliers **This PR is based on #5732 and only the last two commits are relevant for this PR** ## What is the purpose of the change Earlier versions had a default value in `ValueState`. We dropped that, because the value would have to be duplicated on each access, to be safe against side effects when using mutable types. This pull request re-adds the feature, but using a supplier/factory function to create the default value on access. This is more efficient than copying a shared default value on access. ## Brief change log - The `StateDescriptor` produces default values through a optional `Supplier` function. - For backwards compatibility, the mode to pass directly a value is kept. The value is wrapped in a `DefaultValueFactory` which implements the legacy functionality using a serializer to copy the value on each access. ## Verifying this change - This change adds a set of unit tests - The change modifies one example program (`StateMachineExample`). Running that example shows how the change works end-to-end. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink state_default_values Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5735 commit 1c756f0d6dfe71114a97a1b9effaf321b9da063b Author: Stephan Ewen Date: 2018-03-20T14:29:12Z [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor commit 186008c609635f99e4123912a632a4e068d3c532 Author: Stephan Ewen Date: 2018-03-20T14:36:19Z [hotfix] [core] Demockitofy state descriptor tests commit 98666506c193feffb3952d9d424d3aa924f40318 Author: Stephan Ewen Date: 2018-03-20T14:44:27Z [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects. commit 1b286e4adbb5369df41c902bd161f5e854b862b8 Author: Stephan Ewen Date: 2018-03-20T15:22:12Z [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization. Throwing away TypeInformation upon serialization was previously done because the type information was not serializable. Now that it is serializable, we can (and should) keep it to provide consistent user experience, where all serializers respect the ExecutionConfig. commit 6064b3d49d75d40ea69a65f5e38724bf9119b526 Author: Stephan Ewen Date: 2018-03-20T15:46:13Z [hotfix] [core] Consilidate serializer duplication tests in StateDescriptorTest where possible commit a29b128f4f1bec49f1403aa21889e5890dc589ee Author: Stephan Ewen Date: 2018-03-20T16:16:06Z [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling commit f19a4721acae62f8ba578c7cb235b6a917f3a258 Author: Stephan Ewen Date: 2018-03-20T17:04:24Z [FLINK-9036] [core] Add default values to State Descriptors via suppliers commit 6d7757017f52f7c3fd7cbe99d05f1de63186d12d Author: Stephan Ewen Date: 2018-03-20T18:51:02Z [FLINK-9036] [examples] Use state default value in StateMachineExample > Add default value via suppliers > --- > > Key: FLINK-9036 > URL: https://issues.apache.org/jira/browse/FLINK-9036 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.6.0 > > > Earlier versions had a default value in
[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union
[ https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407842#comment-16407842 ] Stephan Ewen commented on FLINK-9031: - That's good input, thanks. > DataSet Job result changes when adding rebalance after union > > > Key: FLINK-9031 > URL: https://issues.apache.org/jira/browse/FLINK-9031 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime, Optimizer >Affects Versions: 1.3.1 >Reporter: Fabian Hueske >Priority: Critical > Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt > > > A user [reported this issue on the user mailing > list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E]. > {quote}I am using Flink 1.3.1 and I have found a strange behavior on running > the following logic: > # Read data from file and store into DataSet > # Split dataset in two, by checking if "field1" of POJOs is empty or not, so > that the first dataset contains only elements with non empty "field1", and > the second dataset will contain the other elements. > # Each dataset is then grouped by, one by "field1" and other by another > field, and subsequently reduced. > # The 2 datasets are merged together by union. > # The final dataset is written as json. > What I was expected, from output, was to find only one element with a > specific value of "field1" because: > # Reducing the first dataset grouped by "field1" should generate only one > element with a specific value of "field1". > # The second dataset should contain only elements with empty "field1". > # Making an union of them should not duplicate any record. > This does not happen. When i read the generated jsons i see some duplicate > (non empty) values of "field1". > Strangely this does not happen when the union between the two datasets is > not computed. In this case the first dataset produces elements only with > distinct values of "field1", while second dataset produces only records with > empty field "value1". > {quote} > The user has not enable object reuse. > Later he reports that the problem disappears when he injects a rebalance() > after a union resolves the problem. I had a look at the execution plans for > both cases (attached to this issue) but could not identify a problem. > Hence I assume, this might be an issue with the runtime code but we need to > look deeper into this. The user also provided an example program consisting > of two classes which are attached to the issue as well. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5729: [FLINK-7343][kafka-tests] Fix test at-least-once test ins...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5729 merging. ---
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407853#comment-16407853 ] ASF GitHub Bot commented on FLINK-7343: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5729 merging. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
[ https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407852#comment-16407852 ] ASF GitHub Bot commented on FLINK-8073: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5718 merging. > Test instability > FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint() > - > > Key: FLINK-8073 > URL: https://issues.apache.org/jira/browse/FLINK-8073 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5718: [FLINK-8073][kafka-tests] Disable timeout in tests
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5718 merging. ---
[jira] [Created] (FLINK-9043) Flink recover from checkpoint like Spark Streaming
godfrey johnson created FLINK-9043: -- Summary: Flink recover from checkpoint like Spark Streaming Key: FLINK-9043 URL: https://issues.apache.org/jira/browse/FLINK-9043 Project: Flink Issue Type: New Feature Reporter: godfrey johnson I know a flink job can reovery from checkpoint with restart strategy, but can not recovery as spark streaming jobs when job is starting. Every time, the submitted flink job is regarded as a new job, while , in the spark streaming job, which can detect the checkpoint directory first, and then recovery from the latest succeed one. However, Flink only can recovery until the job failed first, then retry with strategy. So, would flink support to recover from the checkpoint directly in a new job? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5708: [FLINK-8984][network] Drop taskmanager.exactly-once.block...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5708 merging. ---
[jira] [Commented] (FLINK-8984) Disabling credit based flow control deadlocks Flink on checkpoint
[ https://issues.apache.org/jira/browse/FLINK-8984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407856#comment-16407856 ] ASF GitHub Bot commented on FLINK-8984: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5708 merging. > Disabling credit based flow control deadlocks Flink on checkpoint > - > > Key: FLINK-8984 > URL: https://issues.apache.org/jira/browse/FLINK-8984 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > This is configuration issue. There are two options: > taskmanager.network.credit-based-flow-control.enabled > and > taskmanager.exactly-once.blocking.data.enabled > If we disable first one, but remain default value for the second one > deadlocks will occur. I think we can safely drop the second config value > altogether and always use blocking BarrierBuffer for credit based flow > control and spilling BarrierBuffer for non credit based flow control. > cc [~zjwang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5717: [FLINK-9020][E2E Tests] Use separate modules per testcase
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5717 merging. ---
[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules
[ https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407858#comment-16407858 ] ASF GitHub Bot commented on FLINK-9020: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5717 merging. > Move test projects of end-to-end tests in separate modules > -- > > Key: FLINK-9020 > URL: https://issues.apache.org/jira/browse/FLINK-9020 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Florian Schmidt >Assignee: Florian Schmidt >Priority: Major > > I would like to propose to move each test case in the end-to-end tests into > it's own module. Reason is that currently we are building all jars for the > tests from one pom.xml, which makes it hard to have specific tests for > certain build types (e.g. examples derived from the flink quickstart > archetype). > For the current state this would mean > - change packaging from flink-end-to-end-tests from jar to pom > - refactor the classloader example to be in its own module -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover
[ https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407863#comment-16407863 ] Till Rohrmann commented on FLINK-8946: -- I think we should close the {{TaskManagerMetricGroup}} at some point. E.g. in {{TaskExecutor#postStop}} or in the caller of {{TaskExecutor}}. > TaskManager stop sending metrics after JobManager failover > -- > > Key: FLINK-8946 > URL: https://issues.apache.org/jira/browse/FLINK-8946 > Project: Flink > Issue Type: Bug > Components: Metrics, TaskManager >Affects Versions: 1.4.2 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Critical > Fix For: 1.5.0 > > > Running in Yarn-standalone mode, when the Job Manager performs a failover, > all TaskManager that are inherited from the previous JobManager will not send > metrics to the new JobManager and other registered metric reporters. > > A cursory glance reveal that these line of code might be the cause > [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086] > Perhap the TaskManager close its metrics group when disassociating > JobManager, but not creating a new one on fail-over association ? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)