[jira] [Commented] (FLINK-2535) Fixed size sample algorithm optimization

2015-09-02 Thread GaoLun (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726870#comment-14726870
 ] 

GaoLun commented on FLINK-2535:
---

Hi.I have replaced the sampling algorithm with scalable simple random sampling 
based on the paper. And I have done some test to compare the performance 
between the two sampling method.Here are some statistical data of rejected 
items' number : (source_size = 1000)

SampleSize  SRS SSRS
 100 9998727 9998488
 100 99987559998493
 500 99945029994081
 500 99946249994029
 100099897819989061
 100099896579989132
 500099567809956061
 500099568129956018
 1   99216019919174
 1   99208669919396
 5   96850859682182
 5   96852039681611
 10  94393459435887
 10  94404699435521
 50  80015357998046
 50  80003707996807
 100 66997526690612
 100 66961756692111
 500 15348141530409
 500 15340881529784

With the statistical data, we can see the number of items SRS rejected is more 
than SSRS but isn't obvious.

> Fixed size sample algorithm optimization
> 
>
> Key: FLINK-2535
> URL: https://issues.apache.org/jira/browse/FLINK-2535
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Chengxiang Li
>Priority: Minor
>
> Fixed size sample algorithm is known to be less efficient than sample 
> algorithms with fraction, but sometime it's necessary. Some optimization 
> could significantly reduce the storage size and computation cost, such as the 
> algorithm described in [this 
> paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2535) Fixed size sample algorithm optimization

2015-09-02 Thread GaoLun (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GaoLun updated FLINK-2535:
--
Attachment: sampling.png

Statistical data of rejected items' number with SRS & SSRS.

> Fixed size sample algorithm optimization
> 
>
> Key: FLINK-2535
> URL: https://issues.apache.org/jira/browse/FLINK-2535
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Chengxiang Li
>Priority: Minor
> Attachments: sampling.png
>
>
> Fixed size sample algorithm is known to be less efficient than sample 
> algorithms with fraction, but sometime it's necessary. Some optimization 
> could significantly reduce the storage size and computation cost, such as the 
> algorithm described in [this 
> paper|http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2604) Access to SessionID without holding lock in ZooKeeperLeaderElectionService

2015-09-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726916#comment-14726916
 ] 

Till Rohrmann commented on FLINK-2604:
--

This is not a problem, since all other writing access happen after a 
{{leaderLatch.hasLeadership()}} or as part of the {{LeaderLatchListener}} 
callback {{isLeader}}. Since the leader latch is closed first before assigning 
the {{null}} values to the variables, the call {{hasLeadership()}} will always 
be false.

> Access to SessionID without holding lock in ZooKeeperLeaderElectionService
> --
>
> Key: FLINK-2604
> URL: https://issues.apache.org/jira/browse/FLINK-2604
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In stop() method of ZooKeeperLeaderElectionService :
> {code}
> confirmedLeaderSessionID = null;
> issuedLeaderSessionID = null;
> {code}
> In other part of the class, access to either of the above fields is guarded 
> by synchronized (lock).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-2604) Access to SessionID without holding lock in ZooKeeperLeaderElectionService

2015-09-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2604:
-
Comment: was deleted

(was: This is not a problem, since all other writing access happen after a 
{{leaderLatch.hasLeadership()}} or as part of the {{LeaderLatchListener}} 
callback {{isLeader}}. Since the leader latch is closed first before assigning 
the {{null}} values to the variables, the call {{hasLeadership()}} will always 
be false.)

> Access to SessionID without holding lock in ZooKeeperLeaderElectionService
> --
>
> Key: FLINK-2604
> URL: https://issues.apache.org/jira/browse/FLINK-2604
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In stop() method of ZooKeeperLeaderElectionService :
> {code}
> confirmedLeaderSessionID = null;
> issuedLeaderSessionID = null;
> {code}
> In other part of the class, access to either of the above fields is guarded 
> by synchronized (lock).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2604) Access to SessionID without holding lock in ZooKeeperLeaderElectionService

2015-09-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726921#comment-14726921
 ] 

Till Rohrmann commented on FLINK-2604:
--

Forget my last comment. Thanks for spotting it. I'll fix it.

> Access to SessionID without holding lock in ZooKeeperLeaderElectionService
> --
>
> Key: FLINK-2604
> URL: https://issues.apache.org/jira/browse/FLINK-2604
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In stop() method of ZooKeeperLeaderElectionService :
> {code}
> confirmedLeaderSessionID = null;
> issuedLeaderSessionID = null;
> {code}
> In other part of the class, access to either of the above fields is guarded 
> by synchronized (lock).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2590] fixing DataSetUtils.zipWithUnique...

2015-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1075


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2584] Check for unshaded classes in fat...

2015-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1076


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2590) DataSetUtils.zipWithUniqueID creates duplicate IDs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726935#comment-14726935
 ] 

ASF GitHub Bot commented on FLINK-2590:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1075


> DataSetUtils.zipWithUniqueID creates duplicate IDs
> --
>
> Key: FLINK-2590
> URL: https://issues.apache.org/jira/browse/FLINK-2590
> Project: Flink
>  Issue Type: Bug
>  Components: Java API, Scala API
>Affects Versions: 0.10, master
>Reporter: Martin Junghanns
>Assignee: Martin Junghanns
>Priority: Minor
>
> The function creates IDs using the following code:
> {code:java}
> shifter = log2(numberOfParallelSubtasks)
> id = counter << shifter + taskId;
> {code}
> As the binary function + is executed before the bitshift <<, this results in 
> cases where different tasks create the same ID. It essentially calculates
> {code}
> counter*2^(shifter+taskId)
> {code}
> which is 0 for counter = 0 and all values of shifter and taskID.
> Consider the following example.
> numberOfParallelSubtaks = 8 
> shifter = log2(8) = 4 (maybe rename the function?)
> produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 256
> start: 2, shifter: 4 taskId: 3 label: 256
> start: 4, shifter: 4 taskId: 2 label: 256
> {code}
> I would suggest the following:
> {code}
> counter*2^(shifter)+taskId
> {code}
> which in code is equivalent to
> {code}
> shifter = log2(numberOfParallelSubtasks);
> id = (counter << shifter) + taskId;
> {code}
> and for our example produces:
> {code}
> start: 1, shifter: 4 taskId: 4 label: 20
> start: 2, shifter: 4 taskId: 3 label: 35
> start: 4, shifter: 4 taskId: 2 label: 66
> {code}
> So we move the counter to the left and add the task id. As there is space for 
> 2^shifter numbers, this prevents collisions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2596] Failing Test: RandomSamplerTest

2015-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1080


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2596) Failing Test: RandomSamplerTest

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726933#comment-14726933
 ] 

ASF GitHub Bot commented on FLINK-2596:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1080


> Failing Test: RandomSamplerTest
> ---
>
> Key: FLINK-2596
> URL: https://issues.apache.org/jira/browse/FLINK-2596
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Chengxiang Li
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> {noformat}
> Tests run: 17, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 14.925 sec 
> <<< FAILURE! - in org.apache.flink.api.java.sampling.RandomSamplerTest
> testReservoirSamplerWithMultiSourcePartitions2(org.apache.flink.api.java.sampling.RandomSamplerTest)
>  Time elapsed: 0.444 sec <<< ERROR!
> java.lang.IllegalArgumentException: Comparison method violates its general 
> contract!
> at java.util.TimSort.mergeLo(TimSort.java:747)
> at java.util.TimSort.mergeAt(TimSort.java:483)
> at java.util.TimSort.mergeCollapse(TimSort.java:410)
> at java.util.TimSort.sort(TimSort.java:214)
> at java.util.TimSort.sort(TimSort.java:173)
> at java.util.Arrays.sort(Arrays.java:659)
> at java.util.Collections.sort(Collections.java:217)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.transferFromListToArrayWithOrder(RandomSamplerTest.java:375)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.getSampledOutput(RandomSamplerTest.java:367)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyKSTest(RandomSamplerTest.java:338)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyRandomSamplerWithSampleSize(RandomSamplerTest.java:330)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.verifyReservoirSamplerWithReplacement(RandomSamplerTest.java:290)
> at 
> org.apache.flink.api.java.sampling.RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2(RandomSamplerTest.java:212)
> Results :
> Tests in error:
> RandomSamplerTest.testReservoirSamplerWithMultiSourcePartitions2:212->verifyReservoirSamplerWithReplacement:290->verifyRandomSamplerWithSampleSize:330->verifyKSTest:338->getSampledOutput:367->transferFromListToArrayWithOrder:375
>  ยป IllegalArgument
> {noformat}
> https://travis-ci.org/apache/flink/jobs/77750329



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2584) ASM dependency is not shaded away

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726934#comment-14726934
 ] 

ASF GitHub Bot commented on FLINK-2584:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1076


> ASM dependency is not shaded away
> -
>
> Key: FLINK-2584
> URL: https://issues.apache.org/jira/browse/FLINK-2584
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 0.9, master
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 0.10, 0.9.1
>
>
> ASM is not correctly shaded away. If you build the quick start against the 
> snapshot version, you will see the following dependencies. Robert is fixing 
> this.
> {code}
> [INFO] +- org.apache.flink:flink-java:jar:0.9.1:compile
> [INFO] |  +- org.apache.flink:flink-core:jar:0.9.1:compile
> [INFO] |  |  \- commons-collections:commons-collections:jar:3.2.1:compile
> [INFO] |  +- org.apache.flink:flink-shaded-include-yarn:jar:0.9.1:compile
> [INFO] |  +- org.apache.avro:avro:jar:1.7.6:compile
> [INFO] |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
> [INFO] |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
> [INFO] |  |  +- com.thoughtworks.paranamer:paranamer:jar:2.3:compile
> [INFO] |  |  +- org.xerial.snappy:snappy-java:jar:1.0.5:compile
> [INFO] |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
> [INFO] |  | \- org.tukaani:xz:jar:1.0:compile
> [INFO] |  +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> [INFO] |  |  +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
> [INFO] |  |  \- org.objenesis:objenesis:jar:2.1:compile
> [INFO] |  +- com.twitter:chill_2.10:jar:0.5.2:compile
> [INFO] |  |  +- org.scala-lang:scala-library:jar:2.10.4:compile
> [INFO] |  |  \- com.twitter:chill-java:jar:0.5.2:compile
> [INFO] |  +- com.twitter:chill-avro_2.10:jar:0.5.2:compile
> [INFO] |  |  +- com.twitter:chill-bijection_2.10:jar:0.5.2:compile
> [INFO] |  |  |  \- com.twitter:bijection-core_2.10:jar:0.7.2:compile
> [INFO] |  |  \- com.twitter:bijection-avro_2.10:jar:0.7.2:compile
> [INFO] |  +- de.javakaffee:kryo-serializers:jar:0.36:compile
> [INFO] |  |  +- com.esotericsoftware:kryo:jar:3.0.3:compile
> [INFO] |  |  |  +- com.esotericsoftware:reflectasm:jar:1.10.1:compile
> [INFO] |  |  |  |  \- org.ow2.asm:asm:jar:5.0.3:compile
> [INFO] |  |  |  \- com.esotericsoftware:minlog:jar:1.3.0:compile
> [INFO] |  |  \- com.google.protobuf:protobuf-java:jar:2.6.1:compile
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2604) Access to SessionID without holding lock in ZooKeeperLeaderElectionService

2015-09-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-2604.

Resolution: Fixed

Fixed via 31aededecfc4ec2085c00f23e2a78f660995efaa

> Access to SessionID without holding lock in ZooKeeperLeaderElectionService
> --
>
> Key: FLINK-2604
> URL: https://issues.apache.org/jira/browse/FLINK-2604
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Till Rohrmann
>
> In stop() method of ZooKeeperLeaderElectionService :
> {code}
> confirmedLeaderSessionID = null;
> issuedLeaderSessionID = null;
> {code}
> In other part of the class, access to either of the above fields is guarded 
> by synchronized (lock).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

2015-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1066#discussion_r38506169
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+private static final int TM_SLOTS = 4;
+
+private static final int NUM_TM = 1;
+
+private static final int USER_DOP = 2;
+
+private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+private static ForkableFlinkMiniCluster cluster;
+
+@BeforeClass
+public static void setupCluster() {
+try {
+Configuration config = new Configuration();
+config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
+config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
TM_SLOTS);
+cluster = new ForkableFlinkMiniCluster(config, false);
+cluster.start();
+}
+catch (Exception e) {
+e.printStackTrace();
+fail("Error starting test cluster: " + e.getMessage());
+}
+}
+
+@AfterClass
+public static void tearDownCluster() {
+try {
+cluster.stop();
+}
+catch (Throwable t) {
+t.printStackTrace();
+fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+}
+}
+
+/**
+ * Ensure that that Akka configuration parameters can be set.
+ */
+@Test(expected=IllegalArgumentException.class)
+public void testInvalidAkkaConfiguration() throws Throwable {
+Configuration config = new Configuration();
+config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+
+final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+cluster.hostname(),
+cluster.getLeaderRPCPort(),
+config
+);
+env.getConfig().disableSysoutLogging();
+
+DataSet result = env.createInput(new 
TestNonRichInputFormat());
+result.output(new LocalCollectionOutputFormat(new 
ArrayList()));
+try {
+env.execute();
--- End diff --

Shouldn't we fail after this statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or 

[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726940#comment-14726940
 ] 

ASF GitHub Bot commented on FLINK-2373:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1066#discussion_r38506169
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+private static final int TM_SLOTS = 4;
+
+private static final int NUM_TM = 1;
+
+private static final int USER_DOP = 2;
+
+private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+private static ForkableFlinkMiniCluster cluster;
+
+@BeforeClass
+public static void setupCluster() {
+try {
+Configuration config = new Configuration();
+config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
+config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
TM_SLOTS);
+cluster = new ForkableFlinkMiniCluster(config, false);
+cluster.start();
+}
+catch (Exception e) {
+e.printStackTrace();
+fail("Error starting test cluster: " + e.getMessage());
+}
+}
+
+@AfterClass
+public static void tearDownCluster() {
+try {
+cluster.stop();
+}
+catch (Throwable t) {
+t.printStackTrace();
+fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+}
+}
+
+/**
+ * Ensure that that Akka configuration parameters can be set.
+ */
+@Test(expected=IllegalArgumentException.class)
+public void testInvalidAkkaConfiguration() throws Throwable {
+Configuration config = new Configuration();
+config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+
+final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+cluster.hostname(),
+cluster.getLeaderRPCPort(),
+config
+);
+env.getConfig().disableSysoutLogging();
+
+DataSet result = env.createInput(new 
TestNonRichInputFormat());
+result.output(new LocalCollectionOutputFormat(new 
ArrayList()));
+try {
+env.execute

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

2015-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1066#discussion_r38506469
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+private static final int TM_SLOTS = 4;
+
+private static final int NUM_TM = 1;
+
+private static final int USER_DOP = 2;
+
+private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+private static ForkableFlinkMiniCluster cluster;
+
+@BeforeClass
+public static void setupCluster() {
+try {
+Configuration config = new Configuration();
+config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
+config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
TM_SLOTS);
+cluster = new ForkableFlinkMiniCluster(config, false);
+cluster.start();
+}
+catch (Exception e) {
+e.printStackTrace();
+fail("Error starting test cluster: " + e.getMessage());
+}
+}
+
+@AfterClass
+public static void tearDownCluster() {
+try {
+cluster.stop();
+}
+catch (Throwable t) {
+t.printStackTrace();
+fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+}
+}
+
+/**
+ * Ensure that that Akka configuration parameters can be set.
+ */
+@Test(expected=IllegalArgumentException.class)
+public void testInvalidAkkaConfiguration() throws Throwable {
+Configuration config = new Configuration();
+config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+
+final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+cluster.hostname(),
+cluster.getLeaderRPCPort(),
+config
+);
+env.getConfig().disableSysoutLogging();
+
+DataSet result = env.createInput(new 
TestNonRichInputFormat());
+result.output(new LocalCollectionOutputFormat(new 
ArrayList()));
+try {
+env.execute();
+} catch (ProgramInvocationException ex) {
+throw ex.getCause();
+}
+}
+
+/**
+ * Ensure that the program parallelism can be set even if the 
configuration is supplied.
+ */
  

[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726942#comment-14726942
 ] 

ASF GitHub Bot commented on FLINK-2373:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1066#discussion_r38506469
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class RemoteEnvironmentITCase {
+
+private static final int TM_SLOTS = 4;
+
+private static final int NUM_TM = 1;
+
+private static final int USER_DOP = 2;
+
+private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
+
+private static final String VALID_STARTUP_TIMEOUT = "100 s";
+
+private static ForkableFlinkMiniCluster cluster;
+
+@BeforeClass
+public static void setupCluster() {
+try {
+Configuration config = new Configuration();
+config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
+config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
TM_SLOTS);
+cluster = new ForkableFlinkMiniCluster(config, false);
+cluster.start();
+}
+catch (Exception e) {
+e.printStackTrace();
+fail("Error starting test cluster: " + e.getMessage());
+}
+}
+
+@AfterClass
+public static void tearDownCluster() {
+try {
+cluster.stop();
+}
+catch (Throwable t) {
+t.printStackTrace();
+fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+}
+}
+
+/**
+ * Ensure that that Akka configuration parameters can be set.
+ */
+@Test(expected=IllegalArgumentException.class)
+public void testInvalidAkkaConfiguration() throws Throwable {
+Configuration config = new Configuration();
+config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
+
+final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
+cluster.hostname(),
+cluster.getLeaderRPCPort(),
+config
+);
+env.getConfig().disableSysoutLogging();
+
+DataSet result = env.createInput(new 
TestNonRichInputFormat());
+result.output(new LocalCollectionOutputFormat(new 
ArrayList()));
+try {
+env.execute

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

2015-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1066#issuecomment-136969071
  
From your last comment I conclude that you only wanted to allow akka 
specific features to be set with the configuration. However, I couldn't find 
how you check this. Furthermore, if I remove the `env.setParallelism` call in 
the `testUserSpecificParallelism` test case, then I can still control the 
default DOP with `config.setInteger(ConfigConstants.DEFAULT_PARALLELISM, 42)`, 
for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726945#comment-14726945
 ] 

ASF GitHub Bot commented on FLINK-2373:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1066#issuecomment-136969071
  
From your last comment I conclude that you only wanted to allow akka 
specific features to be set with the configuration. However, I couldn't find 
how you check this. Furthermore, if I remove the `env.setParallelism` call in 
the `testUserSpecificParallelism` test case, then I can still control the 
default DOP with `config.setInteger(ConfigConstants.DEFAULT_PARALLELISM, 42)`, 
for example.


> Add configuration parameter to createRemoteEnvironment method
> -
>
> Key: FLINK-2373
> URL: https://issues.apache.org/jira/browse/FLINK-2373
> Project: Flink
>  Issue Type: Bug
>  Components: other
>Reporter: Andreas Kunft
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of 
> a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the 
> max. payload size in Akka, as we can not increase the configuration value 
> (akka.remote.OversizedPayloadException: Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote 
> environment fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38507948
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I agree with @mjsax that the cancel should be non-blocking. The method 
seems to be badly named, it should maybe have been named `stop()` from the 
beginning. Then it would be more obvious that is is just used for shutting down 
the source, no matter what the reason might be.

Unfortunately `close()` is also taken as part of `RichFunction`.

I think we have two options, keep `cancel` and use it for stopping, no 
matter what the reason is. Or rename it to `stop()` and use it in the same way 
to make it clearer what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726972#comment-14726972
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38507948
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I agree with @mjsax that the cancel should be non-blocking. The method 
seems to be badly named, it should maybe have been named `stop()` from the 
beginning. Then it would be more obvious that is is just used for shutting down 
the source, no matter what the reason might be.

Unfortunately `close()` is also taken as part of `RichFunction`.

I think we have two options, keep `cancel` and use it for stopping, no 
matter what the reason is. Or rename it to `stop()` and use it in the same way 
to make it clearer what it does.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726975#comment-14726975
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38508028
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

About Kafka, I think the approach would be to have `cancel()` signal to the 
source that it should close itself, then have the actual shutdown/cleanup in 
`close()` of RichFunction. What do you think about this @rmetzger ?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38508028
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

About Kafka, I think the approach would be to have `cancel()` signal to the 
source that it should close itself, then have the actual shutdown/cleanup in 
`close()` of RichFunction. What do you think about this @rmetzger ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2379][ml]Add column wise statistics for...

2015-09-02 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1032#issuecomment-136978688
  
@tillrohrmann , can you review this?
This will be a starting point for a package to provide more statistical 
methods, such as hypothesis testing, correlation, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2379) Add methods to evaluate field wise statistics over DataSet of vectors.

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727003#comment-14727003
 ] 

ASF GitHub Bot commented on FLINK-2379:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1032#issuecomment-136978688
  
@tillrohrmann , can you review this?
This will be a starting point for a package to provide more statistical 
methods, such as hypothesis testing, correlation, etc.


> Add methods to evaluate field wise statistics over DataSet of vectors.
> --
>
> Key: FLINK-2379
> URL: https://issues.apache.org/jira/browse/FLINK-2379
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Sachin Goel
>Assignee: Sachin Goel
>
> Design methods to evaluate statistics over dataset of vectors.
> For continuous fields, Minimum, maximum, mean, variance.
> For discrete fields, Class counts, Entropy, Gini Impurity.
> Further statistical measures can also be supported. For example, correlation 
> between two series, computing the covariance matrix, etc. 
> [These are currently the things Spark supports.]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2606) Extend StormFileSpout to handle HDFS

2015-09-02 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2606:
--

 Summary: Extend StormFileSpout to handle HDFS
 Key: FLINK-2606
 URL: https://issues.apache.org/jira/browse/FLINK-2606
 Project: Flink
  Issue Type: New Feature
  Components: Storm Compatibility
Reporter: Matthias J. Sax
Priority: Minor


Currently, `StormFileSpout` uses a simple `FileReader` internally to read local 
files. Thus, it cannot read data from HDFS.

With this ticket, either `StormFileSpout` should be extended to be able to 
handle HDFS files or a new class (eg, `StormHdfsSpout`) should be introduced 
for it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511039
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I have to admit that this is the first time I'm looking into this pull 
request ;)
Regarding the whole blocking / non-blocking discussion: I think even if we 
would write into the javadocs that the cancel()/stop() call has to be 
implemented in a non-blocking fashion, there would be still users who do it 
wrong! Its just too risky to block the entire actor system by erroneous user 
code. (Aren't our windowing threads doing some magic in the close methods as 
well?!)
I'm think that the canceling of Tasks in the task manager is also done 
using separate cancelling threads?

For the Kafka source: I believe we can move the "fetcher.close()" and 
offsetHandler.close() into the close() method as well. (We would probably need 
to add cancel() method to the Fetcher interface).
But I would not touch the Kafka consumer and make the stop() mechanism more 
robust.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727020#comment-14727020
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511039
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I have to admit that this is the first time I'm looking into this pull 
request ;)
Regarding the whole blocking / non-blocking discussion: I think even if we 
would write into the javadocs that the cancel()/stop() call has to be 
implemented in a non-blocking fashion, there would be still users who do it 
wrong! Its just too risky to block the entire actor system by erroneous user 
code. (Aren't our windowing threads doing some magic in the close methods as 
well?!)
I'm think that the canceling of Tasks in the task manager is also done 
using separate cancelling threads?

For the Kafka source: I believe we can move the "fetcher.close()" and 
offsetHandler.close() into the close() method as well. (We would probably need 
to add cancel() method to the Fetcher interface).
But I would not touch the Kafka consumer and make the stop() mechanism more 
robust.



> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727021#comment-14727021
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511106
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

what happens when a user enters "stop -m yarn-cluster" ?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511106
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

what happens when a user enters "stop -m yarn-cluster" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727108#comment-14727108
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-137009547
  
Rebased to the current master. This should be mergeable now. Travis fails 
on unrelated kafka and flink-fs-tests. Just re-triggered another build.
@StephanEwen 


> Expose attemptNumber in RuntimeContext
> --
>
> Key: FLINK-2488
> URL: https://issues.apache.org/jira/browse/FLINK-2488
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, TaskManager
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
>Priority: Minor
>
> It would be nice to expose the attemptNumber of a task in the 
> {{RuntimeContext}}. 
> This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727109#comment-14727109
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38515501
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

I never tried it in a yarn cluster. Do you think it needs special handling? 
Basically, the "stop" signal is sent to the JobManager which forward the signal 
to all sources. This should be independent if yarn is involved or not. But I 
don't have any experience with yarn. Tell me if I am wrong.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-09-02 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-137009547
  
Rebased to the current master. This should be mergeable now. Travis fails 
on unrelated kafka and flink-fs-tests. Just re-triggered another build.
@StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38515501
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

I never tried it in a yarn cluster. Do you think it needs special handling? 
Basically, the "stop" signal is sent to the JobManager which forward the signal 
to all sources. This should be independent if yarn is involved or not. But I 
don't have any experience with yarn. Tell me if I am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

2015-09-02 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/979#issuecomment-137010239
  
This most likely requires a re-work because of the latest changes in Job 
Manager and Client. Closing for now while I re-examine all the logic. Will 
reopen it in a few days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2472) Make the JobClientActor check periodically if the submitted Job is still running and if the JobManager is still alive

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727111#comment-14727111
 ] 

ASF GitHub Bot commented on FLINK-2472:
---

Github user sachingoel0101 closed the pull request at:

https://github.com/apache/flink/pull/979


> Make the JobClientActor check periodically if the submitted Job is still 
> running and if the JobManager is still alive
> -
>
> Key: FLINK-2472
> URL: https://issues.apache.org/jira/browse/FLINK-2472
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Sachin Goel
>
> In case that the {{JobManager}} dies without notifying possibly connected 
> {{JobClientActors}} or if the job execution finishes without sending the 
> {{SerializedJobExecutionResult}} back to the {{JobClientActor}}, it might 
> happen that a {{JobClient.submitJobAndWait}} never returns.
> I propose to let the {{JobClientActor}} periodically check whether the 
> {{JobManager}} is still alive and whether the submitted job is still running. 
> If not, then the {{JobClientActor}} should return an exception to complete 
> the waiting future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2472) Make the JobClientActor check periodically if the submitted Job is still running and if the JobManager is still alive

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727110#comment-14727110
 ] 

ASF GitHub Bot commented on FLINK-2472:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/979#issuecomment-137010239
  
This most likely requires a re-work because of the latest changes in Job 
Manager and Client. Closing for now while I re-examine all the logic. Will 
reopen it in a few days.


> Make the JobClientActor check periodically if the submitted Job is still 
> running and if the JobManager is still alive
> -
>
> Key: FLINK-2472
> URL: https://issues.apache.org/jira/browse/FLINK-2472
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Sachin Goel
>
> In case that the {{JobManager}} dies without notifying possibly connected 
> {{JobClientActors}} or if the job execution finishes without sending the 
> {{SerializedJobExecutionResult}} back to the {{JobClientActor}}, it might 
> happen that a {{JobClient.submitJobAndWait}} never returns.
> I propose to let the {{JobClientActor}} periodically check whether the 
> {{JobManager}} is still alive and whether the submitted job is still running. 
> If not, then the {{JobClientActor}} should return an exception to complete 
> the waiting future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2472]Make JobClientActor poll JobManage...

2015-09-02 Thread sachingoel0101
Github user sachingoel0101 closed the pull request at:

https://github.com/apache/flink/pull/979


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38516042
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

Would it be possible, to change the interface to an abstract class that 
provides a protected member `isRunning` (or similar) and implements `public 
FINAL void stop()` method by setting `isRunning = false`. Thus, we would 
ensure, that the call is non-blocking. The user needs to check `isRunning` flag 
in `run()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727116#comment-14727116
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38516042
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

Would it be possible, to change the interface to an abstract class that 
provides a protected member `isRunning` (or similar) and implements `public 
FINAL void stop()` method by setting `isRunning = false`. Thus, we would 
ensure, that the call is non-blocking. The user needs to check `isRunning` flag 
in `run()` method.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38516254
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I think that's already how sources are supposed to behave. Kafka does it as 
well, the only thing that needs to be changed is removing the 
`consumer.shutdown()` call from `cancel()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727118#comment-14727118
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38516254
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I think that's already how sources are supposed to behave. Kafka does it as 
well, the only thing that needs to be changed is removing the 
`consumer.shutdown()` call from `cancel()`.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2607) Shade plugin copies signature files from original jar into fat jar

2015-09-02 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-2607:
-

 Summary: Shade plugin copies signature files from original jar 
into fat jar
 Key: FLINK-2607
 URL: https://issues.apache.org/jira/browse/FLINK-2607
 Project: Flink
  Issue Type: Bug
  Components: Quickstarts
Affects Versions: 0.9, 0.10
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: 0.10


The Quickstart project contains a Maven configuration that builds a fat jar 
using the Maven Shade plugin. It copies the {{META-INF}} folder of the original 
jar into the fat jar as well. That can lead to a {{SecurityException}} when 
submitting jobs to the cluster because the signature file contained in the 
original jar is not suitable for the fat jar. 

{noformat}
 java.lang.SecurityException: Invalid signature file digest for Manifest main 
attributes
{noformat}

The solution is to change the configuration of the Shade plugin to ignore the 
signature files in the {{META-INF}} folder when copying the dependencies to the 
fat jar. 

See also:

http://zhentao-li.blogspot.ch/2012/06/maven-shade-plugin-invalid-signature.html
http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1085

[FLINK-2607][quickstart] ignore signature files when creating fat jar



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink quickstart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1085


commit 9bc513129755b82a9b71b3b21c7ab4d6a0305cf8
Author: Maximilian Michels 
Date:   2015-09-02T10:21:49Z

[FLINK-2607][quickstart] ignore signature files when creating fat jar




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2607) Shade plugin copies signature files from original jar into fat jar

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727142#comment-14727142
 ] 

ASF GitHub Bot commented on FLINK-2607:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1085

[FLINK-2607][quickstart] ignore signature files when creating fat jar



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink quickstart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1085


commit 9bc513129755b82a9b71b3b21c7ab4d6a0305cf8
Author: Maximilian Michels 
Date:   2015-09-02T10:21:49Z

[FLINK-2607][quickstart] ignore signature files when creating fat jar




> Shade plugin copies signature files from original jar into fat jar
> --
>
> Key: FLINK-2607
> URL: https://issues.apache.org/jira/browse/FLINK-2607
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The Quickstart project contains a Maven configuration that builds a fat jar 
> using the Maven Shade plugin. It copies the {{META-INF}} folder of the 
> original jar into the fat jar as well. That can lead to a 
> {{SecurityException}} when submitting jobs to the cluster because the 
> signature file contained in the original jar is not suitable for the fat jar. 
> {noformat}
>  java.lang.SecurityException: Invalid signature file digest for Manifest main 
> attributes
> {noformat}
> The solution is to change the configuration of the Shade plugin to ignore the 
> signature files in the {{META-INF}} folder when copying the dependencies to 
> the fat jar. 
> See also:
> http://zhentao-li.blogspot.ch/2012/06/maven-shade-plugin-invalid-signature.html
> http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2015-09-02 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-2608:
-

 Summary: Arrays.asList(..) does not work with CollectionInputFormat
 Key: FLINK-2608
 URL: https://issues.apache.org/jira/browse/FLINK-2608
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 0.9, 0.10
Reporter: Maximilian Michels
Priority: Minor
 Fix For: 0.10


When using Arrays.asList(..) as input for a CollectionInputFormat, the 
serialization/deserialization fails when deploying the task.

See the following program:

{code:java}
public class WordCountExample {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

// DOES NOT WORK
List elements = Arrays.asList(0, 0, 0);
// The following works:
//List elements = new ArrayList<>(new int[] {0,0,0});

DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();
}

public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
}


}
{code}

{noformat}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'DataSource (at main(Test.java:32) 
(org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Deserializing the InputFormat 
([mytests.Test$TestClass@4d6025c5]) failed: unread block data
at 
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
... 25 more
Caused by: java.lang.IllegalStateException: unre

[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-02 Thread Martin Liesenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727191#comment-14727191
 ] 

Martin Liesenberg commented on FLINK-2017:
--

OK, thanks for the feedback, I hope to have something to show by the weekend. 

> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2021) Rework examples to use new ParameterTool

2015-09-02 Thread Behrouz Derakhshan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727212#comment-14727212
 ] 

Behrouz Derakhshan commented on FLINK-2021:
---

Hi Robert, 
If no one has picked this up, I'd like to start working on it. Thanks... 

> Rework examples to use new ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2021) Rework examples to use new ParameterTool

2015-09-02 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727226#comment-14727226
 ] 

Robert Metzger commented on FLINK-2021:
---

No, I think nobody is working on this.

Before changing all examples, I would actually like to seek for consensus on 
the mailing list. The change affects how new users learn to use Flink. 
Do you want to write to dev@flink or should I do it?

> Rework examples to use new ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2603) Flink hangs before starting execution

2015-09-02 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727286#comment-14727286
 ] 

Greg Hogan commented on FLINK-2603:
---

Thanks, [~StephanEwen]. Changing {code}akka.framesize{code} to 100 MB resolved 
my problem, which was a JOIN followed by another JOIN similarly hanging 
mid-execution when increasing data size.

> Flink hangs before starting execution
> -
>
> Key: FLINK-2603
> URL: https://issues.apache.org/jira/browse/FLINK-2603
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Greg Hogan
>
> The following simple application created from Flink's Java quickstart will 
> hang before execution if the data size is to large. I have tested this on 
> both 0.9.1 and master (7364ce18) and the threshold occurs with
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 19
> running in a few seconds while
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 20
> seemingly hangs forever.
> I first put this together two months ago so if it is a bug it is not a new 
> bug.
> {code}
> package blocks;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.DiscardingOutputFormat;
> public class Job {
> public static void main(String[] args) throws Exception {
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> List data = new ArrayList<>();
> int count = 1 << Integer.valueOf(args[0]);
> 
> for (int i = 0 ; i < count ; i++) {
> data.add(0L);
> }
> env.fromCollection(data).output(new DiscardingOutputFormat());
> System.out.println("Ready to execute ...");
> 
> // execute program
> env.execute("Flink Java API Skeleton");
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2603) Flink hangs before starting execution

2015-09-02 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727286#comment-14727286
 ] 

Greg Hogan edited comment on FLINK-2603 at 9/2/15 12:54 PM:


Thanks, [~StephanEwen]. Changing {noformat}akka.framesize{noformat} to 100 MB 
resolved my problem, which was a JOIN followed by another JOIN similarly 
hanging mid-execution when increasing data size.


was (Author: greghogan):
Thanks, [~StephanEwen]. Changing {code}akka.framesize{code} to 100 MB resolved 
my problem, which was a JOIN followed by another JOIN similarly hanging 
mid-execution when increasing data size.

> Flink hangs before starting execution
> -
>
> Key: FLINK-2603
> URL: https://issues.apache.org/jira/browse/FLINK-2603
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Greg Hogan
>
> The following simple application created from Flink's Java quickstart will 
> hang before execution if the data size is to large. I have tested this on 
> both 0.9.1 and master (7364ce18) and the threshold occurs with
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 19
> running in a few seconds while
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 20
> seemingly hangs forever.
> I first put this together two months ago so if it is a bug it is not a new 
> bug.
> {code}
> package blocks;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.DiscardingOutputFormat;
> public class Job {
> public static void main(String[] args) throws Exception {
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> List data = new ArrayList<>();
> int count = 1 << Integer.valueOf(args[0]);
> 
> for (int i = 0 ; i < count ; i++) {
> data.add(0L);
> }
> env.fromCollection(data).output(new DiscardingOutputFormat());
> System.out.println("Ready to execute ...");
> 
> // execute program
> env.execute("Flink Java API Skeleton");
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2603) Flink hangs before starting execution

2015-09-02 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727286#comment-14727286
 ] 

Greg Hogan edited comment on FLINK-2603 at 9/2/15 12:58 PM:


Thanks, [~StephanEwen]. Changing {{akka.framesize}} to 100 MB resolved my 
problem, which was a JOIN followed by another JOIN similarly hanging 
mid-execution when increasing data size.


was (Author: greghogan):
Thanks, [~StephanEwen]. Changing {noformat}akka.framesize{noformat} to 100 MB 
resolved my problem, which was a JOIN followed by another JOIN similarly 
hanging mid-execution when increasing data size.

> Flink hangs before starting execution
> -
>
> Key: FLINK-2603
> URL: https://issues.apache.org/jira/browse/FLINK-2603
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Greg Hogan
>
> The following simple application created from Flink's Java quickstart will 
> hang before execution if the data size is to large. I have tested this on 
> both 0.9.1 and master (7364ce18) and the threshold occurs with
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 19
> running in a few seconds while
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 20
> seemingly hangs forever.
> I first put this together two months ago so if it is a bug it is not a new 
> bug.
> {code}
> package blocks;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.DiscardingOutputFormat;
> public class Job {
> public static void main(String[] args) throws Exception {
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> List data = new ArrayList<>();
> int count = 1 << Integer.valueOf(args[0]);
> 
> for (int i = 0 ; i < count ; i++) {
> data.add(0L);
> }
> env.fromCollection(data).output(new DiscardingOutputFormat());
> System.out.println("Ready to execute ...");
> 
> // execute program
> env.execute("Flink Java API Skeleton");
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2410] [java api] PojoTypeInfo is not co...

2015-09-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/943#discussion_r38529490
  
--- Diff: 
flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.io.Serializable;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+   public PojoGroupingITCase(TestExecutionMode mode) {
+   super(mode);
+   }
+
+   private String resultPath;
+   private String expected = "";
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   }
+
+   @Test
+   public void testPojoGrouping() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> data = env.fromElements(
+   new Tuple3("A", 23.0, 
"Z"),
+   new Tuple3("A", 24.0, 
"Y"),
+   new Tuple3("B", 1.0, 
"Z"));
+
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   Table table = tableEnv
+   .fromDataSet(data, "groupMe, value, name")
+   .select("groupMe, value, name")
+   .where("groupMe != 'B'");
+
+   DataSet myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
+
+   DataSet result = myPojos.groupBy("groupMe")
+   .sortGroup("value", Order.DESCENDING)
+   .first(1);
+   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
--- End diff --

Thanks for the hint, I'm now using `collect`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2410) PojoTypeInfo is not completely serializable

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727311#comment-14727311
 ] 

ASF GitHub Bot commented on FLINK-2410:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/943#discussion_r38529490
  
--- Diff: 
flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.io.Serializable;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+   public PojoGroupingITCase(TestExecutionMode mode) {
+   super(mode);
+   }
+
+   private String resultPath;
+   private String expected = "";
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   }
+
+   @Test
+   public void testPojoGrouping() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet> data = env.fromElements(
+   new Tuple3("A", 23.0, 
"Z"),
+   new Tuple3("A", 24.0, 
"Y"),
+   new Tuple3("B", 1.0, 
"Z"));
+
+   TableEnvironment tableEnv = new TableEnvironment();
+
+   Table table = tableEnv
+   .fromDataSet(data, "groupMe, value, name")
+   .select("groupMe, value, name")
+   .where("groupMe != 'B'");
+
+   DataSet myPojos = tableEnv.toDataSet(table, 
MyPojo.class);
+
+   DataSet result = myPojos.groupBy("groupMe")
+   .sortGroup("value", Order.DESCENDING)
+   .first(1);
+   result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
--- End diff --

Thanks for the hint, I'm now using `collect`.


> PojoTypeInfo is not completely serializable
> ---
>
> Key: FLINK-2410
> URL: https://issues.apache.org/jira/browse/FLINK-2410
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Table API requires PojoTypeInfo to be serializable. The following code fails:
> {code}
> Table finishedEtlTable = maxMeasurements
> .join(stationTable).where("s_station_id = m_station_id")
> .select("year, month, day, value, country, name");
> DataSet maxTemp = tableEnv.toDataSet(finishedEtlTable, 
> MaxTemperature.class);
> maxTemp
> .groupBy("year")
> .sortGroup("value", Order.DESCENDING)
> .first(1)
> .print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2021) Rework examples to use new ParameterTool

2015-09-02 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727331#comment-14727331
 ] 

Robert Metzger commented on FLINK-2021:
---

Sure ;)

> Rework examples to use new ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2021) Rework examples to use new ParameterTool

2015-09-02 Thread Behrouz Derakhshan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727328#comment-14727328
 ] 

Behrouz Derakhshan commented on FLINK-2021:
---

First I will have a look at a couple of the examples, and then I will write to 
the dev@flink, and see what do others think ...

> Rework examples to use new ParameterTool
> 
>
> Key: FLINK-2021
> URL: https://issues.apache.org/jira/browse/FLINK-2021
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Priority: Minor
>  Labels: starter
>
> In FLINK-1525, we introduced the {{ParameterTool}}.
> We should port the examples to use the tool.
> The examples could look like this (we should maybe discuss it first on the 
> mailing lists):
> {code}
> public static void main(String[] args) throws Exception {
> ParameterTool pt = ParameterTool.fromArgs(args);
> boolean fileOutput = pt.getNumberOfParameters() == 2;
> String textPath = null;
> String outputPath = null;
> if(fileOutput) {
> textPath = pt.getRequired("input");
> outputPath = pt.getRequired("output");
> }
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setUserConfig(pt);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK1919] add HCatOutputFormat

2015-09-02 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1064#issuecomment-137080649
  
@jamescao: It seems that you also wrote tests for the HCatInputFormat, 
right? is it possible to split the PR into a OutputFormat part and open a 
separate PR for the HCatInputFormat tests. I'm still working on FLINK-2167 and 
require a HCatalog testing infrastructure. Otherwise I have to write it my own. 
Anyway, I wonder why all HCat I/O format classes have no tests so far...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2603) Flink hangs before starting execution

2015-09-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727334#comment-14727334
 ] 

Stephan Ewen commented on FLINK-2603:
-

Is the join now hanging or was it hanging before?

> Flink hangs before starting execution
> -
>
> Key: FLINK-2603
> URL: https://issues.apache.org/jira/browse/FLINK-2603
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Greg Hogan
>
> The following simple application created from Flink's Java quickstart will 
> hang before execution if the data size is to large. I have tested this on 
> both 0.9.1 and master (7364ce18) and the threshold occurs with
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 19
> running in a few seconds while
>   $ ./bin/flink run /path/to/blocks/target/blocks-1.0-SNAPSHOT.jar 20
> seemingly hangs forever.
> I first put this together two months ago so if it is a bug it is not a new 
> bug.
> {code}
> package blocks;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.DiscardingOutputFormat;
> public class Job {
> public static void main(String[] args) throws Exception {
> // set up the execution environment
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> List data = new ArrayList<>();
> int count = 1 << Integer.valueOf(args[0]);
> 
> for (int i = 0 ; i < count ; i++) {
> data.add(0L);
> }
> env.fromCollection(data).output(new DiscardingOutputFormat());
> System.out.println("Ready to execute ...");
> 
> // execute program
> env.execute("Flink Java API Skeleton");
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727333#comment-14727333
 ] 

ASF GitHub Bot commented on FLINK-2167:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1064#issuecomment-137080649
  
@jamescao: It seems that you also wrote tests for the HCatInputFormat, 
right? is it possible to split the PR into a OutputFormat part and open a 
separate PR for the HCatInputFormat tests. I'm still working on FLINK-2167 and 
require a HCatalog testing infrastructure. Otherwise I have to write it my own. 
Anyway, I wonder why all HCat I/O format classes have no tests so far...


> Add fromHCat() to TableEnvironment
> --
>
> Key: FLINK-2167
> URL: https://issues.apache.org/jira/browse/FLINK-2167
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Minor
>  Labels: starter
>
> Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} 
> from an HCatalog table.
> The implementation could reuse Flink's HCatInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727359#comment-14727359
 ] 

ASF GitHub Bot commented on FLINK-2493:
---

Github user chenliang613 closed the pull request at:

https://github.com/apache/flink/pull/1068


> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2493] Simplify names of example program...

2015-09-02 Thread chenliang613
Github user chenliang613 closed the pull request at:

https://github.com/apache/flink/pull/1068


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137088288
  
During a Flink training, many people were experiencing this problem. I'd 
also like to merge this to the Milestone branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2607) Shade plugin copies signature files from original jar into fat jar

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727364#comment-14727364
 ] 

ASF GitHub Bot commented on FLINK-2607:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137088288
  
During a Flink training, many people were experiencing this problem. I'd 
also like to merge this to the Milestone branch.


> Shade plugin copies signature files from original jar into fat jar
> --
>
> Key: FLINK-2607
> URL: https://issues.apache.org/jira/browse/FLINK-2607
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The Quickstart project contains a Maven configuration that builds a fat jar 
> using the Maven Shade plugin. It copies the {{META-INF}} folder of the 
> original jar into the fat jar as well. That can lead to a 
> {{SecurityException}} when submitting jobs to the cluster because the 
> signature file contained in the original jar is not suitable for the fat jar. 
> {noformat}
>  java.lang.SecurityException: Invalid signature file digest for Manifest main 
> attributes
> {noformat}
> The solution is to change the configuration of the Shade plugin to ignore the 
> signature files in the {{META-INF}} folder when copying the dependencies to 
> the fat jar. 
> See also:
> http://zhentao-li.blogspot.ch/2012/06/maven-shade-plugin-invalid-signature.html
> http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2545] add bucket member count verificat...

2015-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1067


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Create new Test environments on ge...

2015-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1031


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727371#comment-14727371
 ] 

ASF GitHub Bot commented on FLINK-2545:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1067


> NegativeArraySizeException while creating hash table bloom filters
> --
>
> Key: FLINK-2545
> URL: https://issues.apache.org/jira/browse/FLINK-2545
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master
>Reporter: Greg Hogan
>Assignee: Chengxiang Li
>
> The following exception occurred a second time when I immediately re-ran my 
> application, though after recompiling and restarting Flink the subsequent 
> execution ran without error.
> java.lang.Exception: The data preparation for task '...' , caused an error: 
> null
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NegativeArraySizeException
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
>   at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727372#comment-14727372
 ] 

ASF GitHub Bot commented on FLINK-2448:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1031


> registerCacheFile fails with MultipleProgramsTestbase
> -
>
> Key: FLINK-2448
> URL: https://issues.apache.org/jira/browse/FLINK-2448
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.10
>
>
> When trying to register a file using a constant name an expection is thrown 
> saying the file was already cached.
> This is probably because the same environment is reused, and the cacheFile 
> entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2448.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via 166b3705c9079efafe9217d98e7edaf54e6a84cf

> registerCacheFile fails with MultipleProgramsTestbase
> -
>
> Key: FLINK-2448
> URL: https://issues.apache.org/jira/browse/FLINK-2448
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.10
>
>
> When trying to register a file using a constant name an expection is thrown 
> saying the file was already cached.
> This is probably because the same environment is reused, and the cacheFile 
> entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2545.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via 2e6e4de5d1d2b5123f4311493763fd84f52779ab

Thank you for the contribution!

> NegativeArraySizeException while creating hash table bloom filters
> --
>
> Key: FLINK-2545
> URL: https://issues.apache.org/jira/browse/FLINK-2545
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master
>Reporter: Greg Hogan
>Assignee: Chengxiang Li
> Fix For: 0.10
>
>
> The following exception occurred a second time when I immediately re-ran my 
> application, though after recompiling and restarting Flink the subsequent 
> execution ran without error.
> java.lang.Exception: The data preparation for task '...' , caused an error: 
> null
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NegativeArraySizeException
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
>   at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2448) registerCacheFile fails with MultipleProgramsTestbase

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2448.
---

> registerCacheFile fails with MultipleProgramsTestbase
> -
>
> Key: FLINK-2448
> URL: https://issues.apache.org/jira/browse/FLINK-2448
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Sachin Goel
>Priority: Minor
> Fix For: 0.10
>
>
> When trying to register a file using a constant name an expection is thrown 
> saying the file was already cached.
> This is probably because the same environment is reused, and the cacheFile 
> entries are not cleared between runs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2545) NegativeArraySizeException while creating hash table bloom filters

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2545.
---

> NegativeArraySizeException while creating hash table bloom filters
> --
>
> Key: FLINK-2545
> URL: https://issues.apache.org/jira/browse/FLINK-2545
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master
>Reporter: Greg Hogan
>Assignee: Chengxiang Li
> Fix For: 0.10
>
>
> The following exception occurred a second time when I immediately re-ran my 
> application, though after recompiling and restarting Flink the subsequent 
> execution ran without error.
> java.lang.Exception: The data preparation for task '...' , caused an error: 
> null
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:465)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NegativeArraySizeException
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucket(MutableHashTable.java:1160)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildBloomFilterForBucketsInPartition(MutableHashTable.java:1143)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1117)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:946)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:868)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildInitialTable(MutableHashTable.java:692)
>   at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.open(MutableHashTable.java:455)
>   at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator.open(ReusingBuildSecondHashMatchIterator.java:93)
>   at 
> org.apache.flink.runtime.operators.JoinDriver.prepare(JoinDriver.java:195)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:459)
>   ... 3 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2601.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via dd9979f8ffc7de178968298d8c21df7315633e9b

> IOManagerAsync may produce NPE during shutdown
> --
>
> Key: FLINK-2601
> URL: https://issues.apache.org/jira/browse/FLINK-2601
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Minor
>  Labels: test-stability
> Fix For: 0.10
>
>
> While analyzing a failed YARN test, I detected that it failed because it 
> found the following exception in the logs:
> taskmanager-stderr:
> {code}
> Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> {code}
> taskmanager.log
> {code}
> 18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Starting TaskManager actor
> 18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig  
>- NettyConfig [server address: 
> testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
>  server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
> number of server threads: 0 (use Netty's default), number of client threads: 
> 0 (use Netty's default), server connect backlog: 0 (use Netty's default), 
> client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use 
> Netty's default)]
> 18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Messages between TaskManager and JobManager have a max timeout of 10 
> milliseconds
> 18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Temporary file directory 
> '/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
>  total 15 GB, usable 7 GB (46.67% usable)
> 18:45:00,929 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 
> MB for network buffer pool (number of memory segments: 2048, bytes per 
> segment: 32768).
> 18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Using 0.7 of the currently free heap space for Flink managed memory (236 
> MB).
> 18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager uses directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
>  for spill files.
> 18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner  
>- RECEIVED SIGNAL 15: SIGTERM
> 18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync  
>- Error while shutting down IO Manager reader thread.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> 18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager removed spill file directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
> {code}
> Looks like the TM is shutting down while still starting up. Hardening this 
> should be easy.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/78052378/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137093481
  
Very good fix!

+1 to merge both into `master` and `0.10-milestone1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2607) Shade plugin copies signature files from original jar into fat jar

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727385#comment-14727385
 ] 

ASF GitHub Bot commented on FLINK-2607:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137093481
  
Very good fix!

+1 to merge both into `master` and `0.10-milestone1`.


> Shade plugin copies signature files from original jar into fat jar
> --
>
> Key: FLINK-2607
> URL: https://issues.apache.org/jira/browse/FLINK-2607
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The Quickstart project contains a Maven configuration that builds a fat jar 
> using the Maven Shade plugin. It copies the {{META-INF}} folder of the 
> original jar into the fat jar as well. That can lead to a 
> {{SecurityException}} when submitting jobs to the cluster because the 
> signature file contained in the original jar is not suitable for the fat jar. 
> {noformat}
>  java.lang.SecurityException: Invalid signature file digest for Manifest main 
> attributes
> {noformat}
> The solution is to change the configuration of the Shade plugin to ignore the 
> signature files in the {{META-INF}} folder when copying the dependencies to 
> the fat jar. 
> See also:
> http://zhentao-li.blogspot.ch/2012/06/maven-shade-plugin-invalid-signature.html
> http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727391#comment-14727391
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137094936
  
Looks very good from a first glance!

Can you explain how the writing behaves in cases of failues? Will it start 
a new file? Will it try to append to the previous one? What about incomplete 
records?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137094936
  
Looks very good from a first glance!

Can you explain how the writing behaves in cases of failues? Will it start 
a new file? Will it try to append to the previous one? What about incomplete 
records?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2493] Simplify names of example program...

2015-09-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1068#issuecomment-137095444
  
@chenliang613 Do you want to open another poll request with the fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2607) Shade plugin copies signature files from original jar into fat jar

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727395#comment-14727395
 ] 

ASF GitHub Bot commented on FLINK-2607:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137095491
  
+1 to merge


> Shade plugin copies signature files from original jar into fat jar
> --
>
> Key: FLINK-2607
> URL: https://issues.apache.org/jira/browse/FLINK-2607
> Project: Flink
>  Issue Type: Bug
>  Components: Quickstarts
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The Quickstart project contains a Maven configuration that builds a fat jar 
> using the Maven Shade plugin. It copies the {{META-INF}} folder of the 
> original jar into the fat jar as well. That can lead to a 
> {{SecurityException}} when submitting jobs to the cluster because the 
> signature file contained in the original jar is not suitable for the fat jar. 
> {noformat}
>  java.lang.SecurityException: Invalid signature file digest for Manifest main 
> attributes
> {noformat}
> The solution is to change the configuration of the Shade plugin to ignore the 
> signature files in the {{META-INF}} folder when copying the dependencies to 
> the fat jar. 
> See also:
> http://zhentao-li.blogspot.ch/2012/06/maven-shade-plugin-invalid-signature.html
> http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2607][quickstart] ignore signature file...

2015-09-02 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1085#issuecomment-137095491
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727394#comment-14727394
 ] 

ASF GitHub Bot commented on FLINK-2493:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1068#issuecomment-137095444
  
@chenliang613 Do you want to open another poll request with the fix?


> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2601) IOManagerAsync may produce NPE during shutdown

2015-09-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2601.
---

> IOManagerAsync may produce NPE during shutdown
> --
>
> Key: FLINK-2601
> URL: https://issues.apache.org/jira/browse/FLINK-2601
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
>Priority: Minor
>  Labels: test-stability
> Fix For: 0.10
>
>
> While analyzing a failed YARN test, I detected that it failed because it 
> found the following exception in the logs:
> taskmanager-stderr:
> {code}
> Exception in thread "I/O manager shutdown hook" java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:144)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> {code}
> taskmanager.log
> {code}
> 18:45:00,812 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Starting TaskManager actor
> 18:45:00,819 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig  
>- NettyConfig [server address: 
> testing-worker-linux-docker-56ee9bbf-3203-linux-2.prod.travis-ci.org/172.17.9.129,
>  server port: 38689, memory segment size (bytes): 32768, transport type: NIO, 
> number of server threads: 0 (use Netty's default), number of client threads: 
> 0 (use Netty's default), server connect backlog: 0 (use Netty's default), 
> client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use 
> Netty's default)]
> 18:45:00,822 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Messages between TaskManager and JobManager have a max timeout of 10 
> milliseconds
> 18:45:00,825 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Temporary file directory 
> '/home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007':
>  total 15 GB, usable 7 GB (46.67% usable)
> 18:45:00,929 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 
> MB for network buffer pool (number of memory segments: 2048, bytes per 
> segment: 32768).
> 18:45:01,186 INFO  org.apache.flink.runtime.taskmanager.TaskManager   
>- Using 0.7 of the currently free heap space for Flink managed memory (236 
> MB).
> 18:45:01,755 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager uses directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
>  for spill files.
> 18:45:01,831 ERROR org.apache.flink.yarn.appMaster.YarnTaskManagerRunner  
>- RECEIVED SIGNAL 15: SIGTERM
> 18:45:01,833 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync  
>- Error while shutting down IO Manager reader thread.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:133)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.IOManager$1.run(IOManager.java:103)
> 18:45:01,841 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager   
>- I/O manager removed spill file directory 
> /home/travis/build/rmetzger/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-localDir-nm-1_0/usercache/travis/appcache/application_1441046584836_0007/flink-io-1befed3c-89c5-4b5e-9043-1b92c4c047d4
> {code}
> Looks like the TM is shutting down while still starting up. Hardening this 
> should be easy.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/78052378/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

2015-09-02 Thread gdfm
Github user gdfm commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-137096426
  
@tillrohrmann the reason why 2 is a "magic number" is the following.
Immagine channels as containers, and the load (the incoming data stream) as 
a liquid.
When a key is split in two channels, it creates a "bridge" between these 
channels.
You can imagine it as a pipe between the two specific containers that 
enables sharing the load, and brings the liquid to the same level (this happens 
thanks to the fact that the new messages are sent to the least loaded of the 
two containers).
Now, if you have enough of these pipes between pairs of containers, you 
will effectively establish a network of load sharing among them. Any increase 
in pressure on one of the containers can be shared across the network 
effectively, which brings the load balance.
The trick is to have "enough" keys to create enough pipes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1725) New Partitioner for better load balancing for skewed data

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727399#comment-14727399
 ] 

ASF GitHub Bot commented on FLINK-1725:
---

Github user gdfm commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-137096426
  
@tillrohrmann the reason why 2 is a "magic number" is the following.
Immagine channels as containers, and the load (the incoming data stream) as 
a liquid.
When a key is split in two channels, it creates a "bridge" between these 
channels.
You can imagine it as a pipe between the two specific containers that 
enables sharing the load, and brings the liquid to the same level (this happens 
thanks to the fact that the new messages are sent to the least loaded of the 
two containers).
Now, if you have enough of these pipes between pairs of containers, you 
will effectively establish a network of load sharing among them. Any increase 
in pressure on one of the containers can be shared across the network 
effectively, which brings the load balance.
The trick is to have "enough" keys to create enough pipes.


> New Partitioner for better load balancing for skewed data
> -
>
> Key: FLINK-1725
> URL: https://issues.apache.org/jira/browse/FLINK-1725
> Project: Flink
>  Issue Type: Improvement
>  Components: New Components
>Affects Versions: 0.8.1
>Reporter: Anis Nasir
>Assignee: Anis Nasir
>  Labels: LoadBalancing, Partitioner
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hi,
> We have recently studied the problem of load balancing in Storm [1].
> In particular, we focused on key distribution of the stream for skewed data.
> We developed a new stream partitioning scheme (which we call Partial Key 
> Grouping). It achieves better load balancing than key grouping while being 
> more scalable than shuffle grouping in terms of memory.
> In the paper we show a number of mining algorithms that are easy to implement 
> with partial key grouping, and whose performance can benefit from it. We 
> think that it might also be useful for a larger class of algorithms.
> Partial key grouping is very easy to implement: it requires just a few lines 
> of code in Java when implemented as a custom grouping in Storm [2].
> For all these reasons, we believe it will be a nice addition to the standard 
> Partitioners available in Flink. If the community thinks it's a good idea, we 
> will be happy to offer support in the porting.
> References:
> [1]. 
> https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
> [2]. https://github.com/gdfm/partial-key-grouping



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2605) Unclosed RandomAccessFile may leak resource in StaticFileServerHandler

2015-09-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727404#comment-14727404
 ] 

Stephan Ewen commented on FLINK-2605:
-

Good observation, thank you!

Do you want to prepare a fix for this?

> Unclosed RandomAccessFile may leak resource in StaticFileServerHandler
> --
>
> Key: FLINK-2605
> URL: https://issues.apache.org/jira/browse/FLINK-2605
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> {code}
> final RandomAccessFile raf;
> try {
>   raf = new RandomAccessFile(file, "r");
> }
> catch (FileNotFoundException e) {
>   sendError(ctx, NOT_FOUND);
>   return;
> }
> {code}
> If there is exception after raf is created but before method returns, raf 
> would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2493) Simplify names of example program JARs

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727409#comment-14727409
 ] 

ASF GitHub Bot commented on FLINK-2493:
---

Github user chenliang613 commented on the pull request:

https://github.com/apache/flink/pull/1068#issuecomment-137098642
  
@StephanEwen  yes.  may use another solution :)


> Simplify names of example program JARs
> --
>
> Key: FLINK-2493
> URL: https://issues.apache.org/jira/browse/FLINK-2493
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: chenliang
>Priority: Minor
>  Labels: easyfix, starter
>
> I find the names of the example JARs a bit annoying.
> Why not name the file {{examples/ConnectedComponents.jar}} rather than 
> {{examples/flink-java-examples-0.10-SNAPSHOT-ConnectedComponents.jar}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2493] Simplify names of example program...

2015-09-02 Thread chenliang613
Github user chenliang613 commented on the pull request:

https://github.com/apache/flink/pull/1068#issuecomment-137098642
  
@StephanEwen  yes.  may use another solution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2602) Gelly algorithms obtain new execution environments.

2015-09-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727411#comment-14727411
 ] 

Stephan Ewen commented on FLINK-2602:
-

Actually, I saw this in one spot that was actually already fixed here (by 
sachin as part of another pull request) : 
https://github.com/apache/flink/commit/5d98e77cd740dc991978dbb1d026954f733786a8 

Maybe you find more cases, if not, feel free to close this issue.

> Gelly algorithms obtain new execution environments.
> ---
>
> Key: FLINK-2602
> URL: https://issues.apache.org/jira/browse/FLINK-2602
> Project: Flink
>  Issue Type: Bug
>  Components: Gelly
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Martin Junghanns
>
> I have seen that Gelly occasionally uses 
> {{ExecutionEnvironment.getExecutionEnvironment()}} to obtain en execution 
> environment.
> That easily leads to problems as it creates new execution environments in 
> many cases. For example if the original execution environment was created via 
> {{ExecutionEnvironment.createRemoteEnvironment(...)}}, it will inevitably 
> give you a wrong execution environment.
> If new sources need to be created, they should be created with the execution 
> environment of the other data sets in the computation, for example via 
> {{vertexDataSet.getExecutionEnvironment().fromElementx(a, b, c)}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2161) Flink Scala Shell does not support external jars (e.g. Gelly, FlinkML)

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727426#comment-14727426
 ] 

ASF GitHub Bot commented on FLINK-2161:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/805#issuecomment-137104511
  
Sorry, it fell behind in the long list of Pull Requests.
I'll check it and will merge it if Travis gives a green light.


> Flink Scala Shell does not support external jars (e.g. Gelly, FlinkML)
> --
>
> Key: FLINK-2161
> URL: https://issues.apache.org/jira/browse/FLINK-2161
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Nikolaas Steenbergen
>
> Currently, there is no easy way to load and ship external libraries/jars with 
> Flink's Scala shell. Assume that you want to run some Gelly graph algorithms 
> from within the Scala shell, then you have to put the Gelly jar manually in 
> the lib directory and make sure that this jar is also available on your 
> cluster, because it is not shipped with the user code. 
> It would be good to have a simple mechanism how to specify additional jars 
> upon startup of the Scala shell. These jars should then also be shipped to 
> the cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2161] modified Scala shell start script...

2015-09-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/805#issuecomment-137104511
  
Sorry, it fell behind in the long list of Pull Requests.
I'll check it and will merge it if Travis gives a green light.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2488][FLINK-2496] Expose Task Manager c...

2015-09-02 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-137116109
  
Travis Passes successfully. 
https://travis-ci.org/apache/flink/builds/78368635


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2488) Expose attemptNumber in RuntimeContext

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727471#comment-14727471
 ] 

ASF GitHub Bot commented on FLINK-2488:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1026#issuecomment-137116109
  
Travis Passes successfully. 
https://travis-ci.org/apache/flink/builds/78368635


> Expose attemptNumber in RuntimeContext
> --
>
> Key: FLINK-2488
> URL: https://issues.apache.org/jira/browse/FLINK-2488
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, TaskManager
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
>Priority: Minor
>
> It would be nice to expose the attemptNumber of a task in the 
> {{RuntimeContext}}. 
> This would allow user code to behave differently in restart scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727484#comment-14727484
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137125224
  
If it fails in the middle of writing or before sync/flush is called on the 
writer then the data can be in an inconsistent state. I see three ways of 
dealing with this, one is more long-term.

The long term solution is to make the sink exactly-once aware. Either using 
truncate() support in Hadoop 2.7 or a custom Thread that does merging of part 
files and throwing away of data that was erroneously written.

The two short term options are:
 - Keep it as it is, consumers need to be able to deal with corrupt records 
and ignore them. This would give you at-least-once semantics.
 - Write to a temporary file. When rolling, close the current bucket and 
rename the file to the final filename. This would ensure that the output 
doesn't contain corrupt records but you would have neither at-least-once nor 
exactly-once semantics because some written records would be lost if checkpoint 
restore restores to a state after the writing of the current bucket file 
started.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2609) Automatic type registration is only called from the batch execution environment

2015-09-02 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-2609:
-

 Summary: Automatic type registration is only called from the batch 
execution environment
 Key: FLINK-2609
 URL: https://issues.apache.org/jira/browse/FLINK-2609
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Robert Metzger
Assignee: Robert Metzger


Kryo types in the streaming API are quite expensive to serialize because they 
are not automatically registered at Kryo.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137125224
  
If it fails in the middle of writing or before sync/flush is called on the 
writer then the data can be in an inconsistent state. I see three ways of 
dealing with this, one is more long-term.

The long term solution is to make the sink exactly-once aware. Either using 
truncate() support in Hadoop 2.7 or a custom Thread that does merging of part 
files and throwing away of data that was erroneously written.

The two short term options are:
 - Keep it as it is, consumers need to be able to deal with corrupt records 
and ignore them. This would give you at-least-once semantics.
 - Write to a temporary file. When rolling, close the current bucket and 
rename the file to the final filename. This would ensure that the output 
doesn't contain corrupt records but you would have neither at-least-once nor 
exactly-once semantics because some written records would be lost if checkpoint 
restore restores to a state after the writing of the current bucket file 
started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1725) New Partitioner for better load balancing for skewed data

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727532#comment-14727532
 ] 

ASF GitHub Bot commented on FLINK-1725:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-137140076
  
@gdfm, thanks for the great explanation. A good way to visualize what's 
happening.

I'm just wondering why you shouldn't be able to connect more than 2 
containers, let's say 3. In cases where you have an extremely high data skew, 
this might be helpful. Imagine that you have 10 containers and only 2 of them 
are full. Then in the best case you'll get 4 half filled containers after 
connecting two containers. But this still leaves 6 unused containers. Wouldn't 
it be better to connect for example 5 containers in this case? Then in the best 
case you would use all available containers. But of course this strongly 
depends on your actual data and therefore I'd vote to make the number of 
connected containers configurable.


> New Partitioner for better load balancing for skewed data
> -
>
> Key: FLINK-1725
> URL: https://issues.apache.org/jira/browse/FLINK-1725
> Project: Flink
>  Issue Type: Improvement
>  Components: New Components
>Affects Versions: 0.8.1
>Reporter: Anis Nasir
>Assignee: Anis Nasir
>  Labels: LoadBalancing, Partitioner
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hi,
> We have recently studied the problem of load balancing in Storm [1].
> In particular, we focused on key distribution of the stream for skewed data.
> We developed a new stream partitioning scheme (which we call Partial Key 
> Grouping). It achieves better load balancing than key grouping while being 
> more scalable than shuffle grouping in terms of memory.
> In the paper we show a number of mining algorithms that are easy to implement 
> with partial key grouping, and whose performance can benefit from it. We 
> think that it might also be useful for a larger class of algorithms.
> Partial key grouping is very easy to implement: it requires just a few lines 
> of code in Java when implemented as a custom grouping in Storm [2].
> For all these reasons, we believe it will be a nice addition to the standard 
> Partitioners available in Flink. If the community thinks it's a good idea, we 
> will be happy to offer support in the porting.
> References:
> [1]. 
> https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
> [2]. https://github.com/gdfm/partial-key-grouping



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

2015-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-137140076
  
@gdfm, thanks for the great explanation. A good way to visualize what's 
happening.

I'm just wondering why you shouldn't be able to connect more than 2 
containers, let's say 3. In cases where you have an extremely high data skew, 
this might be helpful. Imagine that you have 10 containers and only 2 of them 
are full. Then in the best case you'll get 4 half filled containers after 
connecting two containers. But this still leaves 6 unused containers. Wouldn't 
it be better to connect for example 5 containers in this case? Then in the best 
case you would use all available containers. But of course this strongly 
depends on your actual data and therefore I'd vote to make the number of 
connected containers configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727540#comment-14727540
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137143368
  
Concerning the second short term option: Is the problem there that the 
checkpointing is not aligned with the rolling? Thus, you can take a checkpoint 
but you still have a temporary file which contains all the records up to the 
checkpoint. If the sink now fails before renaming the temporary file, then the 
records which have been written before the checkpoint was taken are lost. Did I 
understand it correctly? If this is the case, then the records in the temporary 
file should actually be part of the state of the sink, right? 


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-02 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137143368
  
Concerning the second short term option: Is the problem there that the 
checkpointing is not aligned with the rolling? Thus, you can take a checkpoint 
but you still have a temporary file which contains all the records up to the 
checkpoint. If the sink now fails before renaming the temporary file, then the 
records which have been written before the checkpoint was taken are lost. Did I 
understand it correctly? If this is the case, then the records in the temporary 
file should actually be part of the state of the sink, right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137144072
  
Should be, but then we can also just keep them in memory and write when 
checkpointing. But this has more potential of blowing up because OOM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727543#comment-14727543
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137144072
  
Should be, but then we can also just keep them in memory and write when 
checkpointing. But this has more potential of blowing up because OOM.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1725]- New Partitioner for better load ...

2015-09-02 Thread gdfm
Github user gdfm commented on the pull request:

https://github.com/apache/flink/pull/1069#issuecomment-137144309
  
Sure, you can connect multiple containers.
But while the gain you have from going from 1 to 2 is exponential, the gain 
from 2 to 3 and forward is just a constant factor. Nevertheless, there might be 
datasets with extreme skew for which having more choices is necessary. So I 
agree to make it configurable with a default of 2.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >