This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4f689b1 [FLINK-10205] Introduce fault tolerance for InputSplits in batch execution 4f689b1 is described below commit 4f689b1994eb2af584b447dc2f518e5d37bc2b6f Author: Ryantaocer <43952554+ryantao...@users.noreply.github.com> AuthorDate: Wed Apr 10 17:45:14 2019 +0800 [FLINK-10205] Introduce fault tolerance for InputSplits in batch execution This closes #8125. --- .../api/common/io/DefaultInputSplitAssigner.java | 9 ++ .../api/common/io/LocatableInputSplitAssigner.java | 13 ++ .../common/io/ReplicatingInputSplitAssigner.java | 8 ++ .../apache/flink/core/io/InputSplitAssigner.java | 9 ++ .../java/distcp/FileCopyTaskInputFormat.java | 10 ++ .../flink/runtime/executiongraph/Execution.java | 7 ++ .../runtime/executiongraph/ExecutionVertex.java | 44 +++++-- .../apache/flink/runtime/jobmaster/JobMaster.java | 9 +- .../flink/runtime/jobmaster/JobMasterTest.java | 136 ++++++++++++++++++++- .../classloading/jar/CustomInputSplitProgram.java | 9 ++ .../jar/StreamingCustomInputSplitProgram.java | 9 ++ 11 files changed, 243 insertions(+), 20 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java index 25acc42..f8fd187 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java @@ -72,4 +72,13 @@ public class DefaultInputSplitAssigner implements InputSplitAssigner { } return next; } + + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + synchronized (this.splits) { + for (InputSplit split : splits) { + this.splits.add(split); + } + } + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java index 517f40a..7cfcf9f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java @@ -21,11 +21,13 @@ package org.apache.flink.api.common.io; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; +import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.flink.annotation.Public; +import org.apache.flink.core.io.InputSplit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.core.io.InputSplitAssigner; @@ -202,6 +204,17 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner { } } + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + synchronized (this.unassigned) { + for (InputSplit split : splits) { + LocatableInputSplitWithCount lisw = new LocatableInputSplitWithCount((LocatableInputSplit) split); + this.remoteSplitChooser.addInputSplit(lisw); + this.unassigned.add(lisw); + } + } + } + private static final boolean isLocal(String flinkHost, String[] hosts) { if (flinkHost == null || hosts == null) { return false; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java index e7dda94..ee2c721 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java @@ -21,9 +21,11 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; import java.util.Arrays; import java.util.Collection; +import java.util.List; /** * Assigns each InputSplit to each requesting parallel instance. @@ -78,4 +80,10 @@ public class ReplicatingInputSplitAssigner implements InputSplitAssigner { } } + + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + Preconditions.checkArgument(taskId >=0 && taskId < assignCounts.length); + assignCounts[taskId] = 0; + } } diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java index a17dfbe..fcf741d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java @@ -21,6 +21,8 @@ package org.apache.flink.core.io; import org.apache.flink.annotation.PublicEvolving; +import java.util.List; + /** * An input split assigner distributes the {@link InputSplit}s among the instances on which a * data source exists. @@ -38,4 +40,11 @@ public interface InputSplitAssigner { */ InputSplit getNextInputSplit(String host, int taskId); + /** + * Return the splits to assigner if the task failed to process it. + * + * @param splits The list of input splits to be returned. + * @param taskId The id of the task that failed to process the input splits. + * */ + void returnInputSplit(List<InputSplit> splits, int taskId); } diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java index dfd9bf0..fbf7da4 100644 --- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,15 @@ public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCo LOGGER.info("Getting copy task for task: " + taskId); return splits.poll(); } + + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + synchronized (this.splits) { + for (InputSplit split : splits) { + Preconditions.checkState(this.splits.add((FileCopyTaskInputSplit) split)); + } + } + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index d5f091f..dd8db3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -316,6 +317,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } + public InputSplit getNextInputSplit() { + final LogicalSlot slot = this.getAssignedResource(); + final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; + return this.vertex.getNextInputSplit(host); + } + @Override public TaskManagerLocation getAssignedResourceLocation() { // returns non-null only when a location is already assigned diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 42340b1..fc2a0f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; @@ -107,6 +109,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi /** The current or latest execution attempt of this vertex's task. */ private volatile Execution currentExecution; // this field must never be null + private final ArrayList<InputSplit> inputSplits; + // -------------------------------------------------------------------------------------------- /** @@ -188,6 +192,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi getExecutionGraph().registerExecution(currentExecution); this.timeout = timeout; + this.inputSplits = new ArrayList<>(); } @@ -242,8 +247,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } public ExecutionEdge[] getInputEdges(int input) { - if (input < 0 || input >= this.inputEdges.length) { - throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length)); + if (input < 0 || input >= inputEdges.length) { + throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, inputEdges.length)); } return inputEdges[input]; } @@ -252,6 +257,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi return locationConstraint; } + public InputSplit getNextInputSplit(String host) { + final int taskId = getParallelSubtaskIndex(); + synchronized (inputSplits) { + final InputSplit nextInputSplit = jobVertex.getSplitAssigner().getNextInputSplit(host, taskId); + if (nextInputSplit != null) { + inputSplits.add(nextInputSplit); + } + return nextInputSplit; + } + } + @Override public Execution getCurrentExecutionAttempt() { return currentExecution; @@ -371,7 +387,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } - this.inputEdges[inputNumber] = edges; + inputEdges[inputNumber] = edges; // add the consumers to the source // for now (until the receiver initiated handshake is in place), we need to register the @@ -594,11 +610,19 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi timestamp, timeout); - this.currentExecution = newExecution; + currentExecution = newExecution; + + synchronized (inputSplits) { + InputSplitAssigner assigner = jobVertex.getSplitAssigner(); + if (assigner != null) { + assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex()); + inputSplits.clear(); + } + } CoLocationGroup grp = jobVertex.getCoLocationGroup(); if (grp != null) { - this.locationConstraint = grp.getLocationConstraint(subTaskIndex); + locationConstraint = grp.getLocationConstraint(subTaskIndex); } // register this execution at the execution graph, to receive call backs @@ -643,8 +667,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi @VisibleForTesting public void deployToSlot(LogicalSlot slot) throws JobException { - if (this.currentExecution.tryAssignResource(slot)) { - this.currentExecution.deploy(); + if (currentExecution.tryAssignResource(slot)) { + currentExecution.deploy(); } else { throw new IllegalStateException("Could not assign resource " + slot + " to current execution " + currentExecution + '.'); @@ -659,7 +683,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi public CompletableFuture<?> cancel() { // to avoid any case of mixup in the presence of concurrent calls, // we copy a reference to the stack to make sure both calls go to the same Execution - final Execution exec = this.currentExecution; + final Execution exec = currentExecution; exec.cancel(); return exec.getReleaseFuture(); } @@ -669,11 +693,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } public void stop() { - this.currentExecution.stop(); + currentExecution.stop(); } public void fail(Throwable t) { - this.currentExecution.fail(t); + currentExecution.fail(t); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index afd39e0..845a40b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; @@ -566,16 +565,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID)); } - final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); - if (splitAssigner == null) { + if (vertex.getSplitAssigner() == null) { log.error("No InputSplitAssigner for vertex ID {}.", vertexID); return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID)); } - final LogicalSlot slot = execution.getAssignedResource(); - final int taskId = execution.getVertex().getParallelSubtaskIndex(); - final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; - final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); + final InputSplit nextInputSplit = execution.getNextInputSplit(); if (log.isDebugEnabled()) { log.debug("Send next input split {}.", nextInputSplit); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f1baf80..396754b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -22,14 +22,18 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -54,13 +58,16 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -68,6 +75,7 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -87,6 +95,8 @@ import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.operators.DataSourceTask; +import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -128,6 +138,7 @@ import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -164,6 +175,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -831,6 +843,124 @@ public class JobMasterTest extends TestLogger { } } + private JobGraph createDataSourceJobGraph() throws Exception { + final TextInputFormat inputFormat = new TextInputFormat(new Path(".")); + final InputFormatVertex producer = new InputFormatVertex("Producer"); + new TaskConfig(producer.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat)); + producer.setInvokableClass(DataSourceTask.class); + + final JobVertex consumer = new JobVertex("Consumer"); + consumer.setInvokableClass(NoOpInvokable.class); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(producer, consumer); + jobGraph.setAllowQueuedScheduling(true); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0)); + jobGraph.setExecutionConfig(executionConfig); + + return jobGraph; + } + + /** + * Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, ExecutionAttemptID)} + * validate that it will get same result for a different retry + */ + @Test + public void testRequestNextInputSplitWithDataSourceFailover() throws Exception { + + final JobGraph dataSourceJobGraph = createDataSourceJobGraph(); + configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, + FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME); + final JobMaster jobMaster = createJobMaster( + configuration, + dataSourceJobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatServices); + + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId); + + try { + // wait for the start to complete + startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + tddFuture.complete(taskDeploymentDescriptor); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final AllocationID allocationId = allocationIdFuture.get(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(); + + assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers, contains(slotOffer)); + + // obtain tdd for the result partition ids + final TaskDeploymentDescriptor tdd = tddFuture.get(); + + final JobMasterGateway gateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + final TaskInformation taskInformation = tdd.getSerializedTaskInformation() + .deserializeValue(getClass().getClassLoader()); + JobVertexID vertexID = taskInformation.getJobVertexId(); + + //get the previous split + SerializedInputSplit split1 = gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get(); + + //start a new version of this execution + ExecutionGraph executionGraph = jobMaster.getExecutionGraph(); + Execution execution = executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId()); + ExecutionVertex executionVertex = execution.getVertex(); + + gateway.updateTaskExecutionState(new TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), ExecutionState.FAILED)).get(); + Execution newExecution = executionVertex.getCurrentExecutionAttempt(); + + //get the new split + SerializedInputSplit split2 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get(); + + Assert.assertArrayEquals(split1.getInputSplitData(), split2.getInputSplitData()); + + //get the new split3 + SerializedInputSplit split3 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get(); + + Assert.assertNotEquals(split1.getInputSplitData().length, split3.getInputSplitData().length); + gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get(); + InputSplit nullSplit = InstantiationUtil.deserializeObject( + gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get().getInputSplitData(), ClassLoader.getSystemClassLoader()); + Assert.assertNull(nullSplit); + + InputSplit nullSplit1 = InstantiationUtil.deserializeObject( + gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get().getInputSplitData(), ClassLoader.getSystemClassLoader()); + Assert.assertNull(nullSplit1); + + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + @Test public void testRequestNextInputSplit() throws Exception { final List<TestingInputSplit> expectedInputSplits = Arrays.asList( @@ -1319,9 +1449,9 @@ public class JobMasterTest extends TestLogger { @Override public CompletableFuture<String> triggerSavepoint( - @Nullable final String targetDirectory, - final boolean cancelJob, - final Time timeout) { + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { return new CompletableFuture<>(); } }; diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java index a5a2531..fe6c172 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java @@ -151,5 +151,14 @@ public class CustomInputSplitProgram { } } } + + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + synchronized (this) { + for (InputSplit split : splits) { + remainingSplits.add((CustomInputSplit) split); + } + } + } } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java index 69421a6..26fe96a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java @@ -157,6 +157,15 @@ public class StreamingCustomInputSplitProgram { } } } + + @Override + public void returnInputSplit(List<InputSplit> splits, int taskId) { + synchronized (this) { + for (InputSplit split : splits) { + remainingSplits.add((CustomInputSplit) split); + } + } + } } private static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {