[GitHub] flink pull request: [FLINK-441] Rename Pact* and Nephele* classes ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/492#issuecomment-83655940 +1 SHIP IT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1739]Fix the bug of JobManager and Task...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/499#discussion_r26686143 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1274,9 +1274,10 @@ object TaskManager { ' is missing (hostname/address of JobManager to connect to).) } -if (port = 0) { +if (port = 0 || port = 65535) { --- End diff -- Should this be 65536 instead of 65535? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1740
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/501#discussion_r26704861 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java --- @@ -276,7 +276,7 @@ public void testPartialReduceWithDifferentInputOutputType() throws Exception { // check if no shuffle is being executed public void testCheckPartitionShuffleGroupBy() throws Exception { - org.junit.Assume.assumeTrue(mode != ExecutionMode.COLLECTION); + org.junit.Assume.assumeTrue(mode != TestExecutionMode.COLLECTION); --- End diff -- I have committed this fix. If you rebase from master then you should get this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1740
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/501#discussion_r26704856 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupCombineITCase.java --- @@ -58,7 +58,7 @@ */ public class GroupCombineITCase extends MultipleProgramsTestBase { - public GroupCombineITCase(ExecutionMode mode) { + public GroupCombineITCase(TestExecutionMode mode) { --- End diff -- I have committed this fix. If you rebase from master then you should get this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1739]Fix the bug of JobManager and Task...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/499#issuecomment-83150196 +1 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove -j and -a parameters which seemed no lo...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/482#issuecomment-82605275 This is merged by @StephanEwen with http://git-wip-us.apache.org/repos/asf/flink/commit/72ca014e. Thanks Stephan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove -j and -a parameters which seemed no lo...
Github user hsaputra closed the pull request at: https://github.com/apache/flink/pull/482 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add final modifier to PollingThread#lock objec...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/494 Add final modifier to PollingThread#lock object Add final modifier to PollingThread#lock object to make sure immutability as lock object. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink add_final_lock_pollingthread Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/494.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #494 commit 44d550312b029f755129dee74bdbea528256bb23 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-03-17T23:14:44Z Add final modifier to PollingThread#lock object to make sure immutability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/466#issuecomment-81777035 HI @mxm, appreciate the ACK. Just trying to keep up all informed with more code coming in. It is harder to read other people code and flow so was just trying to ease up the process =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1695] Kick off of Flink's machine learn...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/479#discussion_r26526474 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common + +import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} +import org.apache.flink.api.scala.DataSet +import org.apache.flink.core.fs.FileSystem.WriteMode +import org.apache.flink.core.fs.Path + +import scala.reflect.ClassTag + +/** + * Collection of convenience functions + */ +object FlinkTools { + + /** + * --- End diff -- Missing JavaDoc comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/466#issuecomment-79188765 Thanks @mxm ! I add couple more comments about documentation. I am trying to promote habit of adding JavaDoc comment for new classes to help indicate why it has to be created. Reduction of code is harder to do than adding new ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26407753 --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupCombineProperties.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.operators; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.compiler.dag.SingleInputNode; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public final class GroupCombineProperties extends OperatorDescriptorSingle { --- End diff -- Could you add JavaDoc for this class? I know other derived classes for OperatorDescriptorSingle does not have JavaDoc, yet but I am trying to get new code added to have at least simple explanation why the class should be created. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove -j and -a parameters which seemed no lo...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/482#issuecomment-78991357 The issue is that those params are not in command line doc. Or maybe I misread ? Removing unnecessary param will make it easier to read. On Friday, March 13, 2015, Fabian Hueske notificati...@github.com wrote: Its true, the parameters are not necessarily needed, but they don't harm either. In fact, I like it to explicitly specify parameters. I do not see a need to break the API of the CLI client or a substantial gain. â Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/482#issuecomment-78868258. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove -j and -a parameters which seemed no lo...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/482#issuecomment-79140904 Thanks for the input @fhueske. I am trying to make all the example consistent and removing the parameters seem to be consistent with examples in the command line page. So if no more objection I could merge the change EOD. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kick off of Flink's machine learning library
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/479#issuecomment-78541637 @tillrohrmann, could you fire JIRA for this one? Should help when we want to manage releases or merge between branches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1605] Bundle all hadoop dependencies an...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/454#issuecomment-78560058 How much time does this new shading add to the total compile? It used to be around 16-18mins for me using mvn clean install -DskipTests. I just did merge today and it has been more than 25 mins and has not complete the build =( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove -j and -a parameters which seemed no lo...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/482 Remove -j and -a parameters which seemed no longer valid in the doc example for YARN From: ./bin/flink run -j ./examples/flink-java-examples-{{site.FLINK_VERSION_SHORT }}-WordCount.jar \ -a 1 hdfs:////apache-license-v2.txt hdfs:///.../wordcount-result.txt To: ./bin/flink run ./examples/flink-java-examples-{{site.FLINK_VERSION_SHORT }}-WordCount.jar \ hdfs:////apache-license-v2.txt hdfs:///.../wordcount-result.txt Looking at http://ci.apache.org/projects/flink/flink-docs-master/cli.html seemed like -j and -a are no longer valid? You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink fix_doc_run_onyarn_params Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #482 commit 1ae0c5779b0c8b64feb9fd5f00b51a9b83cd4e0e Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-03-12T23:18:03Z Remove -j and -a parameters which seemed no longer valid in the doc example for submit job to Flink run in YARN. From: ./bin/flink run -j ./examples/flink-java-examples-{{site.FLINK_VERSION_SHORT }}-WordCount.jar \ -a 1 hdfs:////apache-license-v2.txt hdfs:///.../wordcount-result.txt To: ./bin/flink run ./examples/flink-java-examples-{{site.FLINK_VERSION_SHORT }}-WordCount.jar \ hdfs:////apache-license-v2.txt hdfs:///.../wordcount-result.txt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26064533 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReducePartialOperatorBase.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * --- End diff -- @mxm, could you add class description on why the base class need to be created and how it relates to existing ones? We would love to add more documentation on the code to help new contributors get comfortable with the code flow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fix checking null for ternary operator check o...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/461#issuecomment-77796550 Thx @StephanEwen, will merge this tomorrow unless someone beats me to it =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1651] Fix test case at JobManagerStartu...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/460#issuecomment-77796609 Thx @StephanEwen, will merge this tomorrow unless someone else beats me to it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fix checking null for ternary operator check o...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/461 Fix checking null for ternary operator check on Exception#getMessage calls Add parentheses on Exception#getMessage calls from pattern of: Initializing the input processing failed + e.getMessage() == null ? . : : + e.getMessage() to: Initializing the input processing failed + (e.getMessage() == null ? . : : + e.getMessage()) Extra parentheses needed to make sure ternary operator check on e.getMessage scope call. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink fix_parentheses_exception_getmessage Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #461 commit 1345cbf38cdbf849bdc7c0ff7e29d02fa00bc8fa Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-03-07T01:50:57Z Fix checking null for Exception#getMessage call from pattern of: Initializing the input processing failed + e.getMessage() == null ? . : : + e.getMessage() to: Initializing the input processing failed + (e.getMessage() == null ? . : : + e.getMessage()) Extra parentheses needed to make sure ternary operator check on e.getMessage scope call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Some simple cleanups and doc updates while loo...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/457#issuecomment-77469917 Thx @tillrohrmann, @uce for the review! Will merge once it passes the build --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1640] Remove tailing slash from paths.
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/453#discussion_r25801300 --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/Path.java --- @@ -217,7 +211,17 @@ public Path(String pathString) { *the path string */ public Path(String scheme, String authority, String path) { - checkPathArg(path); + + if(path == null) { --- End diff -- +1 what Ufuk recommend. This would reduce code duplication. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fix typos in iterations.md file
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/443#issuecomment-76667900 Thx for the review, will merge this soon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Fix typos in iterations.md file
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/443 Fix typos in iterations.md file -) Remove extra solution word. -) Change propagete to propagate. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink fix_typo_in_iterations_md Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #443 commit fd781cc81fe1a9efcb5ad963dedc4571b976d8b7 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-02-28T01:22:57Z Fix typos in iterations.md file: -) Remove extra solution word. -) Change propagete to propagate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1526] Added MinSpanningTree example, li...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/434#discussion_r25399885 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MinSpanningTreeData.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +import java.util.ArrayList; +import java.util.List; + +public class MinSpanningTreeData { --- End diff -- Could you add Javadoc description of what this class does and relationships to other classes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1526] Added MinSpanningTree example, li...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/434#discussion_r25399853 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MinSpanningTreeExample.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.graph.example.utils.MinSpanningTreeData; +import org.apache.flink.graph.library.MinSpanningTree; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple3; + +public class MinSpanningTreeExample implements ProgramDescription { --- End diff -- Could you add Javadoc description of what this class does and the sample output? We are trying to add contributions to add more comments and descriptions to keep up with new code coming in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: fixed package statement
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/437#issuecomment-75822423 +1 LGTM How can Java compiler miss this I am confused =( My IntelliJ IDEA has a red wiggly line indicating mismatch package name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1568] [FLINK-1604] [FLINK-1606] [FLINK-...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/436#issuecomment-75861629 W000t! +1 @uce =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1596] remove space in filename
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/431#issuecomment-75414784 +1 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1501] Add metrics library for monitorin...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/421#issuecomment-75091767 You are the man, Robert! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove extra space after open parenthesis in I...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/416 Remove extra space after open parenthesis in InstanceConnectionInfo#toString Small update to remove extra space after open parenthesis in InstanceConnectionInfo#toString to be consistent with other messages and toString calls. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink fix_extra_space_in_InstanceConnectionInfo_tostring Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #416 commit cc175ae2f3e3a2e7b00ed1207f215487cdc38c2f Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-02-18T18:11:38Z Remove extra space after open parenthesis in InstanceConnectionInfo#toString. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/417#discussion_r24931138 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java --- @@ -52,26 +57,88 @@ /** * Constructs a new IOManager. * -* @param paths -*the basic directory paths for files underlying anonymous channels. +* @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException(The temporary directories must not be null or empty.); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format(flink-io-%s, UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() !storageDir.mkdirs()) { + throw new RuntimeException( + Could not create storage directory for IOManager: + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info(I/O manager uses directory {} for spill files., storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread(I/O manager shutdown hook) { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** -* Close method, marks the I/O manager as closed. +* Close method, marks the I/O manager as closed +* and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info(I/O manager removed spill file directory {}, path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error(IOManager failed to properly clean up temp file directory: + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn(Exception while unregistering IOManager's shutdown hook., t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. +* For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null) { --- End diff -- Would this easier to read with check {{if(path != null path.exists())}} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1483] IOManager puts temp files in dedi...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/417#issuecomment-74930615 Since the IOManager add shutdown hook to clean up the files, should IOManagerAsync#isProperlyShutDown need to call super.isProperlyShutDown ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1542] Test case at BlobUtilsTest should...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/396#issuecomment-74558015 Thanks for the review Ufuk and Stephan. Also thx for Stephan for merging it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1543] Adds better exception handling in...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/394#issuecomment-74388302 No worries, thanks for replying to my concern =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1543] Adds better exception handling in...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/394#discussion_r24689710 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -517,67 +521,83 @@ protected int list(String[] args) { return 1; } - IterableExecutionGraph jobs = AkkaUtils.RunningJobsask(jobManager, - RequestRunningJobs$.MODULE$, getAkkaTimeout()).asJavaIterable(); + final FutureObject response = Patterns.ask(jobManager, + JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout())); - ArrayListExecutionGraph runningJobs = null; - ArrayListExecutionGraph scheduledJobs = null; - if (running) { - runningJobs = new ArrayListExecutionGraph(); - } - if (scheduled) { - scheduledJobs = new ArrayListExecutionGraph(); + Object result = null; + + try{ --- End diff -- More like nit style change. Most code in Flink add a space before '{'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1543] Adds better exception handling in...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/394#discussion_r24690375 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java --- @@ -486,7 +489,8 @@ protected void run() { public void onReceive(Object message) throws Exception { if(message instanceof RegistrationMessages.RegisterTaskManager){ final InstanceID iid = new InstanceID(); - getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1), + getSender().tell(new RegistrationMessages.AcknowledgeRegistration(iid, -1, + Option.ActorRefapply(null)), --- End diff -- Just curious, why not just pass None instead of {{Option.ActorRefapply(null)}} ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1542] Test case at BlobUtilsTest should...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/396 [FLINK-1542] Test case at BlobUtilsTest should not assume user could not create new item in root directory [FLINK-1542] Test case at BlobUtilsTest should not assume user could not create new item in root directory Sometimes, user that run tests could have write access to root dir such as creating /cannot-create-this is possible, hence to exception thrown. Need to construct a Flink test directory under directory specified under java.io.tmpdir and change the permission to not allow create new directory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink FLINK-1542_fix_BlobUtilsTest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/396.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #396 commit c56044ef5cd072a3684dfe7b723f359e0642f04e Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-02-13T21:44:29Z [FLINK-1542] Test case at BlobUtilsTest should not assume user could not create new item in root directory Sometimes, user that run tests could have write access to root dir such as creating /cannot-create-this is possible, hence to exception thrown. Need to construct a Flink test directory under directory specified under java.io.tmpdir and change the permission to not allow create new directory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Remove unused enum values from Aggregations en...
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/373 Remove unused enum values from Aggregations enum. SImple cleanup to remove unused enum values from Aggregations enum. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink remove_unused_enumvalues_from_aggregations Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #373 commit a831e448c7d0558f0c239eab4a2b89b54facd7c2 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-02-06T18:09:20Z Remove unused enum values from Aggregations enum. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1484] Adds explicit disconnect message ...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/368#discussion_r24220454 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -125,6 +126,10 @@ Actor with ActorLogMessages with ActorLogging { override def postStop(): Unit = { log.info(sStopping job manager ${self.path}.) +// disconnect the registered task managers +instanceManager.getAllRegisteredInstances.asScala.foreach{ + _.getTaskManager ! Disconnected(JobManager is stopping)} + for((e,_) - currentJobs.values){ e.fail(new Exception(The JobManager is shutting down.)) --- End diff -- Since we are cleaning up messages, maybe remove The so it is consistent with other messages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1442] Reduce memory consumption of arch...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/344#discussion_r24097257 --- Diff: flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala --- @@ -31,10 +31,11 @@ trait TestingMemoryArchivist extends ActorLogMessages { def receiveTestingMessages: Receive = { case RequestExecutionGraph(jobID) = - graphs.get(jobID) match { -case Some(executionGraph) = sender ! ExecutionGraphFound(jobID, executionGraph) -case None = sender ! ExecutionGraphNotFound(jobID) + val executionGraph = getGraph(jobID) + if (executionGraph != null) { --- End diff -- I like @tillrohrmann to use Option as alternative to null. In Java land, Guava's Optional could be use to do similar thing (which I think will be part of Java8) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/202#discussion_r23973573 --- Diff: flink-addons/flink-language-binding/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py --- @@ -0,0 +1,247 @@ + --- End diff -- No, you can not. I believe Spark had special license deal, as part of AmpLab before, to allow them to get license friendly to Apache license before. - Henry On Mon, Feb 2, 2015 at 11:52 AM, Robert Metzger notificati...@github.com wrote: In flink-addons/flink-language-binding/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py https://github.com/apache/flink/pull/202#discussion_r23952693: @@ -0,0 +1,247 @@ + We need to find another solution here with the licenses. I think we can not just re-distribute this file with our license. â Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/202/files#r23952693. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Added ResultTypeQueryable interface to TypeSer...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/349#issuecomment-72262547 @aalexandrov, could you file JIRA to associate this PR with? I need it for tracking purposes and releases. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1464] Add ResultTypeQueryable interface...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/349#issuecomment-72291369 @aalexandrov, cool! Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1460] fix typos
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/346#issuecomment-72111491 @coderxiang, ah that the tests do comparison in the after method so it is hard to figure out which method fail. But seemed like other builds passed and I am sure that your change does not cause this problem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1460] fix typos
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/346#issuecomment-72109489 HI @aljoscha, thanks. If you could merge it before later tonight please do. Otherwise I could do it later tonight if you did not have time to get to this one =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Improved exception for missing type of InputFo...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/351#issuecomment-72162452 To reduce code duplication and tracing, we could just throw the exception inside createInput(InputFormatX, ? inputFormat, TypeInformationX producedType) by wrapping IllegalArgumentException with InvalidProgramException: {code} public X DataSourceX createInput(InputFormatX, ? inputFormat, TypeInformationX producedType) { try { Preconditions.checkArgument (inputFormat != null, InputFormat must not be null.); Preconditions.checkArgument (producedType != null, Produced type information must not be null.); } catch (IllegalArgumentException iaex) { throw new InvalidProgramException( The type returned by the input format could not be automatically determined. + Please specify the TypeInformation of the produced type explicitly by using + the createInput(InputFormat, TypeInformation)' method instead., iaex); } return new DataSourceX(this, inputFormat, producedType, Utils.getCallLocationName()); } {code} So the other createInput methods callers dont have to deal with the exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: fix typos
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/346#issuecomment-71948299 +1 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23485615 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { + log.debug(The TaskManager {} is already registered at the JobManager {}, but received + +another AcknowledgeRegistration message., self.path, currentJobManager.path) +} + } +} -heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat)) +case AlreadyRegistered(id, blobPort) = + if(!registered) { +log.warning(The TaskManager {} seems to be already registered at the JobManager {} even + + though it has not yet finished the registration process., self.path, sender.path) -profiler foreach { - _.tell(RegisterProfilingListener, JobManager.getProfiler(currentJobManager)) +finishRegistration(id, blobPort) +registered = true + } else { +// ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled){ + log.debug(The TaskManager {} has already been registered at the JobManager {}., +self.path, sender.path) } + } -for (listener - waitForRegistration) { - listener ! RegisteredAtJobManager -} +case RefuseRegistration(reason) = + if(!registered) { +log.error(The registration of task manager {} was refused by the job manager {} + + because {}., self.path, jobManagerAkkaURL, reason) -waitForRegistration.clear() +// Shut task manager down +self ! PoisonPill + } else { +// ignore RefuseRegistration messages which arrived after AcknowledgeRegistration +if(log.isDebugEnabled) { --- End diff
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/328#discussion_r23485570 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -175,62 +175,79 @@ import scala.collection.JavaConverters._ } private def tryJobManagerRegistration(): Unit = { -registrationAttempts = 0 -import context.dispatcher -registrationScheduler = Some(context.system.scheduler.schedule( - TaskManager.REGISTRATION_DELAY, TaskManager.REGISTRATION_INTERVAL, - self, RegisterAtJobManager)) +registrationDuration = 0 seconds + +registered = false + +context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } override def receiveWithLogMessages: Receive = { case RegisterAtJobManager = { - registrationAttempts += 1 + if(!registered) { +registrationDuration += registrationDelay +// double delay for exponential backoff +registrationDelay *= 2 - if (registered) { -registrationScheduler.foreach(_.cancel()) - } - else if (registrationAttempts = TaskManager.MAX_REGISTRATION_ATTEMPTS) { +if (registrationDuration maxRegistrationDuration) { + log.warning(TaskManager could not register at JobManager {} after {}., jobManagerAkkaURL, -log.info(Try to register at master {}. Attempt #{}, jobManagerAkkaURL, - registrationAttempts) -val jobManager = context.actorSelection(jobManagerAkkaURL) +maxRegistrationDuration) -jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) - } - else { -log.error(TaskManager could not register at JobManager.); -self ! PoisonPill + self ! PoisonPill +} else if (!registered) { + log.info(sTry to register at master ${jobManagerAkkaURL}. ${registrationAttempts}. + +sAttempt) + val jobManager = context.actorSelection(jobManagerAkkaURL) + + jobManager ! RegisterTaskManager(connectionInfo, hardwareDescription, numberOfSlots) + + context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) +} } } case AcknowledgeRegistration(id, blobPort) = { - if (!registered) { + if(!registered) { +finishRegistration(id, blobPort) registered = true -currentJobManager = sender -instanceID = id - -context.watch(currentJobManager) - -log.info(TaskManager successfully registered at JobManager {}., - currentJobManager.path.toString) - -setupNetworkEnvironment() -setupLibraryCacheManager(blobPort) + } else { +if (log.isDebugEnabled) { --- End diff -- Small nit, with slf4j formatting we do not need to check isDebugEnabled anymore because it uses parameterized messages feature that check for it before materialize the string. It will the keep the code cleaner =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/324#issuecomment-71119613 Cool, thanks @StephanEwen, if no one beats me merging I will do this EOD today --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1352] [runtime] Fix buggy registration ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/328#issuecomment-71148024 Thanks for the explation @tillrohrmann +1 for exponential backoff approach. We can have max retries and max delay for each try as configurable configuration properties. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/324#issuecomment-71134892 As per recommendation from @StephanEwen, will not merge this to 0.8 until we need to cherry-pick fixes related to these files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/324#issuecomment-71079720 Can I get +1 for this one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/324#issuecomment-70715529 I rename the file from coGroupDataSet.scala to CoGroupDataSet.scala and crossDataSet.scala to CrossDataSet.scala to follow convention Scala file naming. And move out UnfinishedCoGroupOperation class because it is a high level public class by itself and not dependent on CoGroupOperation as sealed trait. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Rename coGroupDataSet.scala to CoGroupDataSet....
GitHub user hsaputra opened a pull request: https://github.com/apache/flink/pull/324 Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala This PR contains changes to follow Scala style: -) Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala -) Move the UnfinishedCoGroupOperation class into its own Scala file You can merge this pull request into a Git repository by running: $ git pull https://github.com/hsaputra/flink rename_coGroupDataSet_filename Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit 85d0dbfb506b954c53ece5ff8f825df5fbde1ed8 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-01-19T22:52:30Z Rename coGroupDataSet.scala to CoGroupDataSet.scala, and crossDataSet.scala to CrossDataSet.scala commit fa9f37c189e397458df4afd89af4a0025373ec84 Author: Henry Saputra henry.sapu...@gmail.com Date: 2015-01-19T23:29:51Z Move the UnfinishedCoGroupOperation class into its own Scala file. The UnfinishedCoGroupOperation does not relate closely to CoGroupOperation via sealed modifier so per Scala style guide [1] I propose to move it to separate file. [1] http://docs.scala-lang.org/style/files.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Update incubator-flink name in the merge pull ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/313#issuecomment-70284303 Thanks @rmetzger, will merge this today. Not a blocker for 0.8 so I will not merge it to 0.8 branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1402 - Remove Serializable extends from ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/306#issuecomment-69967270 Ah ok, thanks for the info Stephen, good to know it was intentional. Do you want to keep this pattern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1183] Generate gentle notification mess...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/296#issuecomment-69971212 +1 gentler and informative =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-1402 - Remove Serializable extends from ...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/306#issuecomment-69992202 I don't remember if there any best practice about this, so If we think it is useful we could keep this style and maybe document it? But I don't think it is good practice for other interfaces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---