[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727598#comment-14727598
 ] 

ASF GitHub Bot commented on FLINK-2373:
---------------------------------------

Github user akunft commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38554964
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + 
t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new 
ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    Yes, I just ensure its cause of the wrong akka setting.


> Add configuration parameter to createRemoteEnvironment method
> -------------------------------------------------------------
>
>                 Key: FLINK-2373
>                 URL: https://issues.apache.org/jira/browse/FLINK-2373
>             Project: Flink
>          Issue Type: Bug
>          Components: other
>            Reporter: Andreas Kunft
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of 
> a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the 
> max. payload size in Akka, as we can not increase the configuration value 
> (akka.remote.OversizedPayloadException: Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote 
> environment fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to