[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407742#comment-16407742
 ] 

ASF GitHub Bot commented on FLINK-8394:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5725


> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...

2018-03-21 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5725
  
@StephanEwen OK, let me close this PR, thanks.


---


[GitHub] flink pull request #5725: [FLINK-8394] Lack of synchronization accessing exp...

2018-03-21 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5725


---


[jira] [Commented] (FLINK-8394) Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407739#comment-16407739
 ] 

ASF GitHub Bot commented on FLINK-8394:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5725
  
Thank your trying to help improve Flink.

I think, however, this patch is not necessary. The method does not require 
synchronization, it should be correct as it is.


> Lack of synchronization accessing expectedRecord in ReceiverThread#shutdown
> ---
>
> Key: FLINK-8394
> URL: https://issues.apache.org/jira/browse/FLINK-8394
> Project: Flink
>  Issue Type: Test
>  Components: Streaming
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   public void shutdown() {
> running = false;
> interrupt();
> expectedRecord.complete(0L);
> {code}
> Access to expectedRecord should be protected by synchronization, as done on 
> other methods.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument

2018-03-21 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9040:
-

Assignee: Sihua Zhou

> JobVertex#setMaxParallelism does not valid argument
> ---
>
> Key: FLINK-9040
> URL: https://issues.apache.org/jira/browse/FLINK-9040
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Sihua Zhou
>Priority: Minor
>
> {code}
> /**
> * Sets the maximum parallelism for the task.
> *
> * @param maxParallelism The maximum parallelism to be set. must be between 1 
> and Short.MAX_VALUE.
> */
> public void setMaxParallelism(int maxParallelism) {
>   this.maxParallelism = maxParallelism;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5725: [FLINK-8394] Lack of synchronization accessing expectedRe...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5725
  
Thank your trying to help improve Flink.

I think, however, this patch is not necessary. The method does not require 
synchronization, it should be correct as it is.


---


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407737#comment-16407737
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5729
  
Sweet, nice to see this fixed.

Code looks good, +1 to merge!


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5729: [FLINK-7343][kafka-tests] Fix test at-least-once test ins...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5729
  
Sweet, nice to see this fixed.

Code looks good, +1 to merge!


---


[jira] [Created] (FLINK-9040) JobVertex#setMaxParallelism does not valid argument

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9040:
---

 Summary: JobVertex#setMaxParallelism does not valid argument
 Key: FLINK-9040
 URL: https://issues.apache.org/jira/browse/FLINK-9040
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.5.0
Reporter: Chesnay Schepler


{code}
/**
* Sets the maximum parallelism for the task.
*
* @param maxParallelism The maximum parallelism to be set. must be between 1 
and Short.MAX_VALUE.
*/
public void setMaxParallelism(int maxParallelism) {
this.maxParallelism = maxParallelism;
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407732#comment-16407732
 ] 

ASF GitHub Bot commented on FLINK-9020:
---

Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5717
  
Thanks @zentol! I addressed your comments and will squash if approved


> Move test projects of end-to-end tests in separate modules
> --
>
> Key: FLINK-9020
> URL: https://issues.apache.org/jira/browse/FLINK-9020
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I would like to propose to move each test case in the end-to-end tests into 
> it's own module. Reason is that currently we are building all jars for the 
> tests from one pom.xml, which makes it hard to have specific tests for 
> certain build types (e.g. examples derived from the flink quickstart 
> archetype).
> For the current state this would mean
> - change packaging from flink-end-to-end-tests from jar to pom
> - refactor the classloader example to be in its own module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5717: [FLINK-9020][E2E Tests] Use separate modules per testcase

2018-03-21 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5717
  
Thanks @zentol! I addressed your comments and will squash if approved


---


[jira] [Commented] (FLINK-9010) NoResourceAvailableException with FLIP-6

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407726#comment-16407726
 ] 

Till Rohrmann commented on FLINK-9010:
--

Does your Yarn cluster has enough resources to run this program? If your 
program consists of 2 operators and you run it with DOP 400, then it should 
require 800 slots (logical). If the two operators are in the same slot sharing 
group, then two logical slots will be deployed to the same {{TaskExecutor}} 
slot. Thus, I'm not sure whether this is an actual problem here. 

Please verify and if this is the case, then let's close this issue.

> NoResourceAvailableException with FLIP-6 
> -
>
> Key: FLINK-9010
> URL: https://issues.apache.org/jira/browse/FLINK-9010
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> I was trying to run a bigger program with 400 slots (100 TMs, 2 slots each) 
> with FLIP-6 mode and a checkpointing interval of 1000 and got the following 
> exception:
> {code}
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000101 - Remaining pending container 
> requests: 302
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000101 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,154 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,155 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml
> 2018-03-16 03:41:20,165 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/3cd0c7d7-ccc1-4680-83a5-54e64dd637bc-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680164 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,168 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> 2018-03-16 03:41:20,176 INFO  
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
> Opening proxy : ip-172-31-3-221.eu-west-1.compute.internal:8041
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>  

[GitHub] flink pull request #5717: [FLINK-9020][E2E Tests] Use separate modules per t...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5717#discussion_r176032646
  
--- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml ---
@@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   parent-child-classloading-test
--- End diff --

* add `flink` prefix
* add `_${scala.binary.version}` suffix
* add `flink-parent-child-classloading-test` (this is what 
allows us to omit the scala stuff when listing child modules)


---


[jira] [Commented] (FLINK-9020) Move test projects of end-to-end tests in separate modules

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407724#comment-16407724
 ] 

ASF GitHub Bot commented on FLINK-9020:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5717#discussion_r176032646
  
--- Diff: flink-end-to-end-tests/parent-child-classloading-test/pom.xml ---
@@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   parent-child-classloading-test
--- End diff --

* add `flink` prefix
* add `_${scala.binary.version}` suffix
* add `flink-parent-child-classloading-test` (this is what 
allows us to omit the scala stuff when listing child modules)


> Move test projects of end-to-end tests in separate modules
> --
>
> Key: FLINK-9020
> URL: https://issues.apache.org/jira/browse/FLINK-9020
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I would like to propose to move each test case in the end-to-end tests into 
> it's own module. Reason is that currently we are building all jars for the 
> tests from one pom.xml, which makes it hard to have specific tests for 
> certain build types (e.g. examples derived from the flink quickstart 
> archetype).
> For the current state this would mean
> - change packaging from flink-end-to-end-tests from jar to pom
> - refactor the classloader example to be in its own module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407705#comment-16407705
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176029439
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
--- End diff --

@zentol the `kafka_cleanup` trap also includes this, which shuts down the 
Flink cluster and checks logs for errors.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the general purpose testing job 
> FLINK-8971 with failures disabled for that.
> The end-to-end test should do the following:
> * Submit FLINK-8971 job
> * Verify that the savepoint is there
> * Cancel job and resume from savepoint
> * Verify that job could be resumed
> * Use different StateBackends: RocksDB incremental async/sync, RocksDB full 
> async/sync, FsStateBackend aysnc/sync



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176029439
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
--- End diff --

@zentol the `kafka_cleanup` trap also includes this, which shuts down the 
Flink cluster and checks logs for errors.


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407703#comment-16407703
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176029020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

I was intended to catch maybe some `RuntimeException`... I will just remove 
the `try catch`.


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176029020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

I was intended to catch maybe some `RuntimeException`... I will just remove 
the `try catch`.


---


[jira] [Resolved] (FLINK-9028) flip6 should check config before starting cluster

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9028.
--
Resolution: Fixed

Fixed via
master:
38aa863d5a710b283b5c9b2eb9225d6fb9cc0c70
7c952dd3a75bc64d10bf9be12e405bbc349422b1

1.5.0:
fecc19088b36fc4c8bca5ff39ba756f8fd71
c6f91334b67d589f0c17ed75c9dbcbaedaf8ba51

> flip6 should check config before starting cluster
> -
>
> Key: FLINK-9028
> URL: https://issues.apache.org/jira/browse/FLINK-9028
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> In flip6, we should perform parameters checking before starting cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8980) End-to-end test: BucketingSink

2018-03-21 Thread Florian Schmidt (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt reassigned FLINK-8980:
--

Assignee: Florian Schmidt

> End-to-end test: BucketingSink
> --
>
> Key: FLINK-8980
> URL: https://issues.apache.org/jira/browse/FLINK-8980
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In order to verify the {{BucketingSink}}, we should add an end-to-end test 
> which verifies that the {{BucketingSink}} does not lose data under failures.
> An idea would be to have a CountUp job which simply counts up a counter which 
> is persisted. The emitted values will be written to disk by the 
> {{BucketingSink}}. Now we should kill randomly Flink processes (cluster 
> entrypoint and TaskExecutors) to simulate failures. Even after these 
> failures, the written files should contain the correct sequence of numbers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407700#comment-16407700
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176028721
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

The only trap is see is `kaf

[jira] [Resolved] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9022.
--
Resolution: Fixed

Fixed via
master: f9df13c5058f194a5c686b9b753345d9226fc87a
1.5.0: 27189d8058d6c3bc00dbc8409f40bedbccf01ac5

> fix resource close in 
> `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
> ---
>
> Key: FLINK-9022
> URL: https://issues.apache.org/jira/browse/FLINK-9022
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We have the following code in 
> {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
> incorrect:
> {code}
> } catch (Exception ex) {
>   // cleanup if something went wrong before results got published.
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend); // this should close 
> operatorStateBackend
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
>   IOUtils.closeQuietly(rawKeyedStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   throw new Exception("Exception while creating 
> StreamOperatorStateContext.", ex);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9028) flip6 should check config before starting cluster

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407698#comment-16407698
 ] 

ASF GitHub Bot commented on FLINK-9028:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5726


> flip6 should check config before starting cluster
> -
>
> Key: FLINK-9028
> URL: https://issues.apache.org/jira/browse/FLINK-9028
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> In flip6, we should perform parameters checking before starting cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407697#comment-16407697
 ] 

ASF GitHub Bot commented on FLINK-9022:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5716


> fix resource close in 
> `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
> ---
>
> Key: FLINK-9022
> URL: https://issues.apache.org/jira/browse/FLINK-9022
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We have the following code in 
> {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
> incorrect:
> {code}
> } catch (Exception ex) {
>   // cleanup if something went wrong before results got published.
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend); // this should close 
> operatorStateBackend
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
>   IOUtils.closeQuietly(rawKeyedStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   throw new Exception("Exception while creating 
> StreamOperatorStateContext.", ex);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176028721
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

The only trap is see is `kafka_cleanup`, which shuts down ZK and kafka. 
What about the state machine though?


---


[GitHub] flink pull request #5726: [FLINK-9028][flip6] perform parameters checking be...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5726


---


[GitHub] flink pull request #5716: [FLINK-9022][state] fix resource release in Stream...

2018-03-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5716


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407695#comment-16407695
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

this method never throws exceptions


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407696#comment-16407696
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028213
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -265,7 +265,11 @@ class TaskManager(
   case t: Exception => log.error("FileCache did not shutdown 
properly.", t)
 }
 
-taskManagerMetricGroup.close()
+try {
+  taskManagerMetricGroup.close()
--- End diff --

same as above


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028213
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -265,7 +265,11 @@ class TaskManager(
   case t: Exception => log.error("FileCache did not shutdown 
properly.", t)
 }
 
-taskManagerMetricGroup.close()
+try {
+  taskManagerMetricGroup.close()
--- End diff --

same as above


---


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5734#discussion_r176028191
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -292,6 +292,13 @@ public void start() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
}
 
+   try {
+   // it will call close() recursively from the parent to 
children
+   taskManagerMetricGroup.close();
--- End diff --

this method never throws exceptions


---


[jira] [Created] (FLINK-9039) Broken link to Hadoop Setup Guide in docs

2018-03-21 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9039:
--

 Summary: Broken link to Hadoop Setup Guide in docs
 Key: FLINK-9039
 URL: https://issues.apache.org/jira/browse/FLINK-9039
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.2
Reporter: Florian Schmidt


On  
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/dependencies.html]
 under the section Hadoop Dependencies there is a link to "Hadoop Setup Guide" 
which links to 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/hadoop.html,]
 which in turn does not exist



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407692#comment-16407692
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027636
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

That's a good idea! I'll give this approach a try.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the ge

[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407691#comment-16407691
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027456
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when the script exits, a c

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027636
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

That's a good idea! I'll give this approach a try.


---


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176027456
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when the script exits, a cleanup hook shuts down the cluster. It also 
parses the logs for any unexpected errors; if there is one, the test fails.


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407688#comment-16407688
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5734
  
CC: @tillrohrmann 


> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407686#comment-16407686
 ] 

ASF GitHub Bot commented on FLINK-9026:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5734

[FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the 
TaskExecutor is shut down

## What is the purpose of the change

We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is 
shutdown.

## Brief change log

- close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK_9026

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5734


commit 94ecbc87e53b4fff306a864971f164c765122194
Author: sihuazhou 
Date:   2018-03-21T09:46:31Z

close the TaskManagerMetricGroup when the TaskExecutor is shut down




> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricGroup wh...

2018-03-21 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5734
  
CC: @tillrohrmann 


---


[GitHub] flink pull request #5734: [FLINK-9026][Metrics] Close the TaskManagerMetricG...

2018-03-21 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5734

[FLINK-9026][Metrics] Close the TaskManagerMetricGroup when the 
TaskExecutor is shut down

## What is the purpose of the change

We should close the `TaskManagerMetricGroup` when the `TaskExecutor` is 
shutdown.

## Brief change log

- close the `TaskManagerMetricGroup` when the `TaskExecutor` is shutdown.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK_9026

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5734.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5734


commit 94ecbc87e53b4fff306a864971f164c765122194
Author: sihuazhou 
Date:   2018-03-21T09:46:31Z

close the TaskManagerMetricGroup when the TaskExecutor is shut down




---


[jira] [Commented] (FLINK-9031) DataSet Job result changes when adding rebalance after union

2018-03-21 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407683#comment-16407683
 ] 

Fabian Hueske commented on FLINK-9031:
--

Adding relevant information from the mail thread:

[~StephanEwen] suggested
{quote}
To diagnose that, can you please check the following:
   - Change the Person data type to be immutable (final fields, no setters, set 
fields in constructor instead). Does that make the problem go away?
   - Change the Person data type to not be a POJO by adding a dummy fields that 
is never used, but does not have a getter/setter. 
Does that make the problem go away?
If either of that is the case, it must be a mutability bug somewhere in either 
accidental object reuse or accidental serializer sharing.
{quote}
 
Making the Person object immutable solved the problem.

> DataSet Job result changes when adding rebalance after union
> 
>
> Key: FLINK-9031
> URL: https://issues.apache.org/jira/browse/FLINK-9031
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime, Optimizer
>Affects Versions: 1.3.1
>Reporter: Fabian Hueske
>Priority: Critical
> Attachments: Person.java, RunAll.java, newplan.txt, oldplan.txt
>
>
> A user [reported this issue on the user mailing 
> list|https://lists.apache.org/thread.html/075f1a487b044079b5d61f199439cb77dd4174bd425bcb3327ed7dfc@%3Cuser.flink.apache.org%3E].
> {quote}I am using Flink 1.3.1 and I have found a strange behavior on running 
> the following logic:
>  # Read data from file and store into DataSet
>  # Split dataset in two, by checking if "field1" of POJOs is empty or not, so 
> that the first dataset contains only elements with non empty "field1", and 
> the second dataset will contain the other elements.
>  # Each dataset is then grouped by, one by "field1" and other by another 
> field, and subsequently reduced.
>  # The 2 datasets are merged together by union.
>  # The final dataset is written as json.
> What I was expected, from output, was to find only one element with a 
> specific value of "field1" because:
>  # Reducing the first dataset grouped by "field1" should generate only one 
> element with a specific value of "field1".
>  # The second dataset should contain only elements with empty "field1".
>  # Making an union of them should not duplicate any record.
> This does not happen. When i read the generated jsons i see some duplicate 
> (non empty) values of "field1".
>  Strangely this does not happen when the union between the two datasets is 
> not computed. In this case the first dataset produces elements only with 
> distinct values of "field1", while second dataset produces only records with 
> empty field "value1".
> {quote}
> The user has not enable object reuse.
> Later he reports that the problem disappears when he injects a rebalance() 
> after a union resolves the problem. I had a look at the execution plans for 
> both cases (attached to this issue) but could not identify a problem.
> Hence I assume, this might be an issue with the runtime code but we need to 
> look deeper into this. The user also provided an example program consisting 
> of two classes which are attached to the issue as well.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407670#comment-16407670
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176020327
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

you could use the log4j reporter and grep the logs.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the gen

[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407669#comment-16407669
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176019938
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when do the jobs shut down i

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176020327
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

you could use the log4j reporter and grep the logs.


---


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176019938
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
+
+# take a savepoint of the state machine job
+SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+  | grep "Savepoint completed. Path:" | sed 's/.* //g')
+
+cancel_job $STATE_MACHINE_JOB
+
+# resume state machine job with savepoint
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+sleep 15
+
+# if state is errorneous and the state machine job produces alerting state 
transitions,
--- End diff --

when do the jobs shut down if no error occurs?


---


[jira] [Closed] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-9038.

Resolution: Duplicate

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {code}
> est 
> testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>  failed with:
> java.lang.AssertionError: Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
>  with a prohibited string (one of [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]). Excerpts:
> [
> 2018-03-20 15:44:20,533 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> ]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
>   at 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}



--
This message was sent by Atlassian JIRA
(v7

[jira] [Commented] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407641#comment-16407641
 ] 

Till Rohrmann commented on FLINK-9038:
--

I thin this one is a duplicate of FLINK-8899 or vice versa.

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {code}
> est 
> testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
>  failed with:
> java.lang.AssertionError: Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
>  with a prohibited string (one of [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]). Excerpts:
> [
> 2018-03-20 15:44:20,533 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> ]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
>   at 
> org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.Forke

[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407636#comment-16407636
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Changes look good @yanghua. Merging this PR.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8909) pyflink.sh not working with yarn

2018-03-21 Thread Hitesh Tiwari (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hitesh Tiwari closed FLINK-8909.

Resolution: Not A Problem

> pyflink.sh not working with yarn
> 
>
> Key: FLINK-8909
> URL: https://issues.apache.org/jira/browse/FLINK-8909
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Hitesh Tiwari
>Priority: Blocker
>
> Hi,
> i want to run the python application from pyflink.sh  with yarn-cluster mode. 
> Added  "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated  pyflink.sh is 
> executing below coomand:
> "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1  -v 
> "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"
>  Running pyflink.sh:
> ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py
> While running  getting below Error:
> java.lang.Exception: The user defined 'open()' method caused an exception: An 
> error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ... 1 more
> 03/09/2018 11:20:23 Job execution switched to status FAILING.
>  java.lang.Exception: The user defined 'open()' method caused an exception: 
> An error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.uti

[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-21 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Changes look good @yanghua. Merging this PR.


---


[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9038:

Description: https://travis-ci.org/apache/flink/jobs/355821305

> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9038:
---

 Summary: YARNSessionCapacitySchedulerITCase fails on travis
 Key: FLINK-9038
 URL: https://issues.apache.org/jira/browse/FLINK-9038
 Project: Flink
  Issue Type: Bug
  Components: Tests, YARN
Affects Versions: 1.5.0
Reporter: Chesnay Schepler
 Fix For: 1.5.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8909) pyflink.sh not working with yarn

2018-03-21 Thread Hitesh Tiwari (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407633#comment-16407633
 ] 

Hitesh Tiwari commented on FLINK-8909:
--

Thanks, 
Configured `python.dc.tmp.dir` and  it worked  for us.  


> pyflink.sh not working with yarn
> 
>
> Key: FLINK-8909
> URL: https://issues.apache.org/jira/browse/FLINK-8909
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Hitesh Tiwari
>Priority: Blocker
>
> Hi,
> i want to run the python application from pyflink.sh  with yarn-cluster mode. 
> Added  "-m yarn-cluster -yn 1 " in pyflink.sh. so my updated  pyflink.sh is 
> executing below coomand:
> "$FLINK_BIN_DIR"/flink run -m yarn-cluster -yn 1  -v 
> "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"
>  Running pyflink.sh:
> ./bin/pyflink.sh /opt/pnda/hitesh/flink-1.4.2/examples/python/WordCount.py
> While running  getting below Error:
> java.lang.Exception: The user defined 'open()' method caused an exception: An 
> error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  ... 1 more
> 03/09/2018 11:20:23 Job execution switched to status FAILING.
>  java.lang.Exception: The user defined 'open()' method caused an exception: 
> An error occurred while copying the file.
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:485)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.lang.RuntimeException: An error occurred while copying the 
> file.
>  at 
> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:114)
>  at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.open(PythonStreamer.java:100)
>  at 
> org.apache.flink.python.api.functions.PythonMapPartition.open(PythonMapPartition.java:53)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>  ... 3 more
>  Caused by: java.io.FileNotFoundException: File /tmp/flink_dc does not exist 
> or the user running Flink ('yarn') has insufficient permissions to access it.
>  at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
>  at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:241)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:318)
>  at 
> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:302)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:

[jira] [Updated] (FLINK-9038) YARNSessionCapacitySchedulerITCase fails on travis

2018-03-21 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9038:

Description: 
https://travis-ci.org/apache/flink/jobs/355821305

{code}
est 
testNonexistingQueueWARNmessage(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
 failed with:
java.lang.AssertionError: Found a file 
/home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-capacityscheduler/flink-yarn-tests-capacityscheduler-logDir-nm-0_0/application_1521560575412_0004/container_1521560575412_0004_01_01/jobmanager.log
 with a prohibited string (one of [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]). Excerpts:
[
2018-03-20 15:44:20,533 ERROR 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
on heartbeat
]
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:394)
at 
org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase.checkForProhibitedLogContents(YARNSessionCapacitySchedulerITCase.java:630)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
{code}

  was:https://travis-ci.org/apache/flink/jobs/355821305


> YARNSessionCapacitySchedulerITCase fails on travis
> --
>
> Key: FLINK-9038
> URL: https://issues.apache.org/jira/browse/FLINK-9038
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> https://travis-ci.org/apache/flink/jobs/355821305
> {cod

[jira] [Updated] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-03-21 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8899:
-
Priority: Major  (was: Minor)

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: flip-6
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
>   at o

[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407621#comment-16407621
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176011796
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

An alternative to this:
Query "read-records" metrics via the REST API, and only proceed after the 
job has processed a said number of records.
This however requires adding an extra dependency, such as jq, for the 
response parsing.


> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usual

[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5733#discussion_r176011796
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -0,0 +1,102 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# this tests runs 2 streaming jobs; adding extra taskmanagers for more 
slots
+add_taskmanagers 1
+
+# get Kafka 0.10.0
+mkdir -p $TEST_DATA_DIR
+if [ -z "$3" ]; then
+  # need to download Kafka because no Kafka was specified on the invocation
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+else
+  echo "Using specified Kafka from $3"
+  cp $3 $TEST_DATA_DIR/kafka.tgz
+fi
+
+tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+# fix kafka config
+sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+# make sure to stop Kafka and ZooKeeper at the end
+
+function kafka_cleanup {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap kafka_cleanup INT
+trap kafka_cleanup EXIT
+
+# zookeeper outputs the "Node does not exist" bit to stderr
+while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+  echo "Waiting for broker..."
+  sleep 1
+done
+
+# create the required topic
+$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+
+# run the state machine example job
+STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $STATE_MACHINE_JOB
+
+# then, run the events generator
+EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c 
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \
+  --kafka-topic test-input --sleep 200 \
+  | grep "Job has been submitted with JobID" | sed 's/.* //g')
+
+wait_job_running $EVENTS_GEN_JOB
+
+# wait a bit to have some events pass through the state machine
+sleep 15
--- End diff --

An alternative to this:
Query "read-records" metrics via the REST API, and only proceed after the 
job has processed a said number of records.
This however requires adding an extra dependency, such as jq, for the 
response parsing.


---


[jira] [Commented] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-03-21 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407618#comment-16407618
 ] 

Nico Kruber commented on FLINK-8899:


It should, however, also not be in "minor" priority as this may affect user 
experience - as do all the other mentioned exceptions (which should get JIRA 
tickets). Every exception in the log will potentially make the users (and us) 
investigate it and burn a lot of time.

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: flip-6
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl

[jira] [Commented] (FLINK-8975) End-to-end test: Resume from savepoint

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407616#comment-16407616
 ] 

ASF GitHub Bot commented on FLINK-8975:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5733

[FLINK-8975] [test] Add resume from savepoint end-to-end test

## What is the purpose of the change

This pull request adds an end-to-end test that verifies resuming a job from 
a savepoint.

The complete end-to-end test consists of the following:

1. The `StateMachineExample` is used for the end-to-end test
2. A separate job to generate Kafka events for the state machine is run
3. After the state machine job runs for a while, we take a savepoint.
4. Cancelling and resuming the state machine example job with the savepoint.

All the above steps should not result in any errors or outputs from the 
state machine job. If so, the end-to-end test would fail.

## Brief change log

- Add a separate main class for the Kafka events generator job
- Add `test_resume_savepoint.sh` test script

## Verifying this change

This PR itself introduces a new test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5733


commit 529e060cb05fd723b8656dcc9ef48f8011282dd8
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:25:37Z

[FLINK-8975] [test] Add Kafka events generator job for StateMachineExample

commit 213638b4194cceccd597e90c78631a6c6a191abb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:32:51Z

[FLINK-8975] [test] Add resume from savepoint end-to-end test




> End-to-end test: Resume from savepoint
> --
>
> Key: FLINK-8975
> URL: https://issues.apache.org/jira/browse/FLINK-8975
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> User usually take a savepoint and want to resume from it. In order to verify 
> that Flink supports this feature, we should add an end-to-end test which 
> scripts this behavior. We should use the general purpose testing job 
> FLINK-8971 with failures disabled for that.
> The end-to-end test should do the following:
> * Submit FLINK-8971 job
> * Verify that the savepoint is there
> * Cancel job and resume from savepoint
> * Verify that job could be resumed
> * Use different StateBackends: RocksDB incremental async/sync, RocksDB full 
> async/sync, FsStateBackend aysnc/sync



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5733: [FLINK-8975] [test] Add resume from savepoint end-...

2018-03-21 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5733

[FLINK-8975] [test] Add resume from savepoint end-to-end test

## What is the purpose of the change

This pull request adds an end-to-end test that verifies resuming a job from 
a savepoint.

The complete end-to-end test consists of the following:

1. The `StateMachineExample` is used for the end-to-end test
2. A separate job to generate Kafka events for the state machine is run
3. After the state machine job runs for a while, we take a savepoint.
4. Cancelling and resuming the state machine example job with the savepoint.

All the above steps should not result in any errors or outputs from the 
state machine job. If so, the end-to-end test would fail.

## Brief change log

- Add a separate main class for the Kafka events generator job
- Add `test_resume_savepoint.sh` test script

## Verifying this change

This PR itself introduces a new test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8975

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5733


commit 529e060cb05fd723b8656dcc9ef48f8011282dd8
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:25:37Z

[FLINK-8975] [test] Add Kafka events generator job for StateMachineExample

commit 213638b4194cceccd597e90c78631a6c6a191abb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-03-21T08:32:51Z

[FLINK-8975] [test] Add resume from savepoint end-to-end test




---


[jira] [Closed] (FLINK-8519) FileAlreadyExistsException on Start Flink Session

2018-03-21 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-8519.
--
Resolution: Invalid

Thanks [~yew1eb] for the clarification. Let me close this jira ticket then.

> FileAlreadyExistsException on Start Flink Session 
> --
>
> Key: FLINK-8519
> URL: https://issues.apache.org/jira/browse/FLINK-8519
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Hai Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> *steps to reproduce:*
>  1. build flink from source , git commit: c1734f4
>  2. run script:
> source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher;
> export YARN_CONF_DIR=/path/hadoop/etc/hadoop;
> export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop;
> export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; 
> /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D 
> yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java 
> %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm 
> job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d
>  
>  *error infos:*
> 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> Error while running the Flink Yarn session.
>  java.lang.reflect.UndeclaredThrowableException
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786)
>  Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>  ... 2 more
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user 
> already exists as dir; cannot create link here
>  at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244)
>  at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:485)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384)
>  ... 7 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8979) Extend Kafka end-to-end tests to run with different versions

2018-03-21 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8979:
--

Assignee: Tzu-Li (Gordon) Tai

> Extend Kafka end-to-end tests to run with different versions
> 
>
> Key: FLINK-8979
> URL: https://issues.apache.org/jira/browse/FLINK-8979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current {{Kafka}} end-to-end test only runs with Kafka 0.10. We should 
> extend the test to also run with
> * Kafka 0.8
> * Kafka 0.9
> * Kafka 0.11
> Additionally we should change the test job to not be embarrassingly parallel 
> by introducing a shuffle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407601#comment-16407601
 ] 

Till Rohrmann commented on FLINK-9026:
--

I think it is often better to close/unregister resources in the scope where 
they have been opened/registered. This makes resource management much easier.

But then we should at least close the {{TaskManagerMetricGroup}} when the 
{{TaskExecutor}} is shut down.

> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9037) Test flake Kafka09ITCase#testCancelingEmptyTopic

2018-03-21 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9037:
---

 Summary: Test flake Kafka09ITCase#testCancelingEmptyTopic
 Key: FLINK-9037
 URL: https://issues.apache.org/jira/browse/FLINK-9037
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.0
Reporter: Stephan Ewen


{code}
Test 
testCancelingEmptyTopic(org.apache.flink.streaming.connectors.kafka.Kafka09ITCase)
 failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 6 
milliseconds
{code}

Full log: https://api.travis-ci.org/v3/job/356044885/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407588#comment-16407588
 ] 

ASF GitHub Bot commented on FLINK-9034:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176006260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

good catch, will fix that upon merging


> State Descriptors drop TypeInformation on serialization
> ---
>
> Key: FLINK-9034
> URL: https://issues.apache.org/jira/browse/FLINK-9034
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction  {
> private final ValueStateDescriptor descr = new 
> ValueStateDescriptor<>("state name", MyType.class);
> private ValueState state;
> @Override
> public void open() {
> state = getRuntimeContext().getValueState(descr);
> }
> }
> {code}
> The problem is that the state descriptor drops the type information and 
> creates a serializer before serialization as part of shipping the function in 
> the cluster. To do that, it initializes the serializer with an empty 
> execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information 
> before shipping was necessary, because the type info was not serializable. It 
> now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r176006260
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

good catch, will fix that upon merging


---


[jira] [Commented] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2

2018-03-21 Thread Mohammad Abareghi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407582#comment-16407582
 ] 

Mohammad Abareghi commented on FLINK-9029:
--

[~StephanEwen] Yes. Security is OFF. 

I'll try to remove Hadoop uber jar ASAP (hopefully later today).

Will drop a comment here. 

> Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
> ---
>
> Key: FLINK-9029
> URL: https://issues.apache.org/jira/browse/FLINK-9029
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.4.2
> Environment: * Flink-1.4.2 (Flink-1.4.1)
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>Reporter: Mohammad Abareghi
>Priority: Major
>
> *Environment*
>  * Flink-1.4.2
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>  
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
> {code:java}
> WARN org.apache.hadoop.security.UserGroupInformation: 
> PriviledgedActionException as:xng (auth:SIMPLE) 
> cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
> {code}
> *NOTE*:
>  * If I run the same job on flink-1.4.0, Error disappears regardless of what 
> version of flink (1.4.0 or 1.4.2) dependencies I have for job
>  * Also if I run the job main method from my IDE and pass the same 
> parameters, I don't get above error.
> *NOTE*:
> It seems the problem somehow is in 
> {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that 
> with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the 
> cluster and run my job (flink topology) then the error doesn't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2

2018-03-21 Thread Mohammad Abareghi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohammad Abareghi updated FLINK-9029:
-
Description: 
*Environment*
 * Flink-1.4.2
 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
 * Ubuntu 16.04.3 LTS
 * Java 8

 

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:
{code:java}
WARN org.apache.hadoop.security.UserGroupInformation: 
PriviledgedActionException as:xng (auth:SIMPLE) 
cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
{code}
*NOTE*:
 * If I run the same job on flink-1.4.0, Error disappears regardless of what 
version of flink (1.4.0 or 1.4.2) dependencies I have for job
 * Also if I run the job main method from my IDE and pass the same parameters, 
I don't get above error.

*NOTE*:

It seems the problem somehow is in 
{{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with 
{{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster 
and run my job (flink topology) then the error doesn't appear.

  was:
*Environment*
 * Flink-1.4.2
 * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
 * Ubuntu 16.04.3 LTS
 * Java 8

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:

 

*Description*

I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. After 
updating to flink-1.4.2 I'm getting the following error from Hadoop complaining 
that the user doesn't have write permission to the given path:
{code:java}
WARN org.apache.hadoop.security.UserGroupInformation: 
PriviledgedActionException as:xng (auth:SIMPLE) 
cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
{code}
*NOTE*:
 * If I run the same job on flink-1.4.0, Error disappears regardless of what 
version of flink (1.4.0 or 1.4.2) dependencies I have for job
 * Also if I run the job main method from my IDE and pass the same parameters, 
I don't get above error.

*NOTE*:

It seems the problem somehow is in 
{{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that with 
{{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the cluster 
and run my job (flink topology) then the error doesn't appear.


> Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
> ---
>
> Key: FLINK-9029
> URL: https://issues.apache.org/jira/browse/FLINK-9029
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.4.2
> Environment: * Flink-1.4.2 (Flink-1.4.1)
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>Reporter: Mohammad Abareghi
>Priority: Major
>
> *Environment*
>  * Flink-1.4.2
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>  
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS to a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
> {code:java}
> WARN org.apache.hadoop.security.UserGroupInformation: 
> PriviledgedActionException as:xng (auth:SIMPLE) 
> cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
> {code}
> *NOTE*:
>  * If I run the same job on flink-1.4.0, Error disappears regardless of what 
> version of flink (1.4.0 or 1.4.2) dependencies I have for job
>  * Also if I run the job main method from my IDE and pass the same 
> parameters, I don't get above error.
> *NOTE*:
> It seems the problem somehow is in 
> {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that 
> with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the 
> cluster and run my job (flink topology) then the error doesn't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3