[
https://issues.apache.org/jira/browse/FLINK-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649932#comment-16649932
]
ASF GitHub Bot commented on FLINK-10530:
tillrohrmann closed pull request #6827: [FLINK-10530][tests] Harden
ProcessFailureCancelingITCase and AbstractTaskManagerProcessFailureRecovery
URL: https://github.com/apache/flink/pull/6827
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5d7f26bb886..83298aa78ec 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -18,18 +18,19 @@
package org.apache.flink.test.recovery;
+import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerResource;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
@@ -42,6 +43,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
@@ -76,6 +79,9 @@
@Rule
public final BlobServerResource blobServerResource = new
BlobServerResource();
+ @Rule
+ public final ZooKeeperResource zooKeeperResource = new
ZooKeeperResource();
+
@Test
public void testTaskManagerProcessFailure() throws Exception {
@@ -89,18 +95,19 @@ public void testTaskManagerProcessFailure() throws
Exception {
File coordinateTempDir = null;
- final int jobManagerPort = NetUtils.getAvailablePort();
- final int restPort = NetUtils.getAvailablePort();
-
- Configuration jmConfig = new Configuration();
- jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
- jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
- jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
- jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
500L);
- jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT,
1L);
- jmConfig.setInteger(RestOptions.PORT, restPort);
-
- try (final StandaloneSessionClusterEntrypoint clusterEntrypoint
= new StandaloneSessionClusterEntrypoint(jmConfig)) {
+ Configuration config = new Configuration();
+ config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
500L);
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT,
1L);
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getAbsolutePath());
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+ config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+
+ try (final StandaloneSessionClusterEntrypoint clusterEntrypoint
= new StandaloneSessionClusterEntrypoint(config)) {
// check that we run this test only if the java command
// is available on this machine