This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8650e40 [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api 8650e40 is described below commit 8650e409763a9b4eba92c77cc02ca22edc0230af Author: TsReaper <tsreape...@gmail.com> AuthorDate: Sun May 17 21:47:31 2020 +0800 [FLINK-17735][streaming] Add an iterator to collect sink results through coordination rest api This closes #12073 --- .../operators/collect/CollectResultFetcher.java | 334 +++++++++++++++++++++ .../operators/collect/CollectResultIterator.java | 95 ++++++ .../api/operators/collect/CollectSinkFunction.java | 33 +- .../collect/CollectSinkOperatorCoordinator.java | 22 +- .../collect/CollectSinkOperatorFactory.java | 21 +- .../collect/CollectResultIteratorTest.java | 132 ++++++++ .../operators/collect/CollectSinkFunctionTest.java | 206 ++++++++----- .../CollectSinkOperatorCoordinatorTest.java | 6 +- .../collect/utils/CollectRequestSender.java | 31 -- .../operators/collect/utils/TestCollectClient.java | 141 --------- .../utils/TestCoordinationRequestHandler.java | 213 +++++++++++++ .../api/operators/collect/utils/TestJobClient.java | 133 ++++++++ 12 files changed, 1095 insertions(+), 272 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java new file mode 100644 index 0000000..ec7286b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A fetcher which fetches query results from sink and provides exactly-once semantics. + */ +public class CollectResultFetcher<T> { + + private static final int DEFAULT_RETRY_MILLIS = 100; + private static final long DEFAULT_ACCUMULATOR_GET_MILLIS = 10000; + + private static final Logger LOG = LoggerFactory.getLogger(CollectResultFetcher.class); + + private final CompletableFuture<OperatorID> operatorIdFuture; + private final String accumulatorName; + private final int retryMillis; + + private ResultBuffer buffer; + + @Nullable + private JobClient jobClient; + @Nullable + private CoordinationRequestGateway gateway; + + private boolean jobTerminated; + private boolean closed; + + public CollectResultFetcher( + CompletableFuture<OperatorID> operatorIdFuture, + TypeSerializer<T> serializer, + String accumulatorName) { + this( + operatorIdFuture, + serializer, + accumulatorName, + DEFAULT_RETRY_MILLIS); + } + + CollectResultFetcher( + CompletableFuture<OperatorID> operatorIdFuture, + TypeSerializer<T> serializer, + String accumulatorName, + int retryMillis) { + this.operatorIdFuture = operatorIdFuture; + this.accumulatorName = accumulatorName; + this.retryMillis = retryMillis; + + this.buffer = new ResultBuffer(serializer); + + this.jobTerminated = false; + this.closed = false; + } + + public void setJobClient(JobClient jobClient) { + Preconditions.checkArgument( + jobClient instanceof CoordinationRequestGateway, + "Job client must be a CoordinationRequestGateway. This is a bug."); + this.jobClient = jobClient; + this.gateway = (CoordinationRequestGateway) jobClient; + } + + public T next() throws IOException { + if (closed) { + return null; + } + + // this is to avoid sleeping before first try + boolean beforeFirstTry = true; + do { + T res = buffer.next(); + if (res != null) { + // we still have user-visible results, just use them + return res; + } else if (jobTerminated) { + // no user-visible results, but job has terminated, we have to return + return null; + } else if (!beforeFirstTry) { + // no results but job is still running, sleep before retry + sleepBeforeRetry(); + } + beforeFirstTry = false; + + if (isJobTerminated()) { + // job terminated, read results from accumulator + jobTerminated = true; + Tuple2<Long, CollectCoordinationResponse<T>> accResults = getAccumulatorResults(); + buffer.dealWithResponse(accResults.f1, accResults.f0); + buffer.complete(); + } else { + // job still running, try to fetch some results + long requestOffset = buffer.offset; + CollectCoordinationResponse<T> response; + try { + response = sendRequest(buffer.version, requestOffset); + } catch (Exception e) { + LOG.warn("An exception occurs when fetching query results", e); + continue; + } + // the response will contain data (if any) starting exactly from requested offset + buffer.dealWithResponse(response, requestOffset); + } + } while (true); + } + + public void close() { + if (closed) { + return; + } + + cancelJob(); + closed = true; + } + + @SuppressWarnings("unchecked") + private CollectCoordinationResponse<T> sendRequest( + String version, + long offset) throws InterruptedException, ExecutionException { + checkJobClientConfigured(); + + OperatorID operatorId = operatorIdFuture.getNow(null); + Preconditions.checkNotNull(operatorId, "Unknown operator ID. This is a bug."); + + CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset); + return (CollectCoordinationResponse<T>) gateway.sendCoordinationRequest(operatorId, request).get(); + } + + private Tuple2<Long, CollectCoordinationResponse<T>> getAccumulatorResults() throws IOException { + checkJobClientConfigured(); + + JobExecutionResult executionResult; + try { + // this timeout is sort of hack, see comments in isJobTerminated for explanation + executionResult = jobClient.getJobExecutionResult(getClass().getClassLoader()).get( + DEFAULT_ACCUMULATOR_GET_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IOException("Failed to fetch job execution result", e); + } + + ArrayList<byte[]> accResults = executionResult.getAccumulatorResult(accumulatorName); + if (accResults == null) { + // job terminates abnormally + throw new IOException("Job terminated abnormally, no job execution result can be fetched"); + } + + try { + List<byte[]> serializedResults = + SerializedListAccumulator.deserializeList(accResults, BytePrimitiveArraySerializer.INSTANCE); + byte[] serializedResult = serializedResults.get(0); + return CollectSinkFunction.deserializeAccumulatorResult(serializedResult); + } catch (ClassNotFoundException | IOException e) { + // this is impossible + throw new IOException("Failed to deserialize accumulator results", e); + } + } + + private boolean isJobTerminated() { + checkJobClientConfigured(); + + try { + JobStatus status = jobClient.getJobStatus().get(); + return status.isGloballyTerminalState(); + } catch (Exception e) { + // TODO + // This is sort of hack. + // Currently different execution environment will have different behaviors + // when fetching a finished job status. + // For example, standalone session cluster will return a normal FINISHED, + // while mini cluster will throw IllegalStateException, + // and yarn per job will throw ApplicationNotFoundException. + // We have to assume that job has finished in this case. + // Change this when these behaviors are unified. + LOG.warn("Failed to get job status so we assume that the job has terminated. Some data might be lost.", e); + return true; + } + } + + private void cancelJob() { + checkJobClientConfigured(); + + if (!isJobTerminated()) { + jobClient.cancel(); + } + } + + private void sleepBeforeRetry() { + if (retryMillis <= 0) { + return; + } + + try { + // TODO a more proper retry strategy? + Thread.sleep(retryMillis); + } catch (InterruptedException e) { + LOG.warn("Interrupted when sleeping before a retry", e); + } + } + + private void checkJobClientConfigured() { + Preconditions.checkNotNull(jobClient, "Job client must be configured before first use."); + Preconditions.checkNotNull(gateway, "Coordination request gateway must be configured before first use."); + } + + /** + * A buffer which encapsulates the logic of dealing with the response from the {@link CollectSinkFunction}. + * See Java doc of {@link CollectSinkFunction} for explanation of this communication protocol. + */ + private class ResultBuffer { + + private static final String INIT_VERSION = ""; + + private final LinkedList<T> buffer; + private final TypeSerializer<T> serializer; + + // for detailed explanation of the following 3 variables, see Java doc of CollectSinkFunction + // `version` is to check if the sink restarts + private String version; + // `offset` is the offset of the next result we want to fetch + private long offset; + + // userVisibleHead <= user visible results offset < userVisibleTail + private long userVisibleHead; + private long userVisibleTail; + + private ResultBuffer(TypeSerializer<T> serializer) { + this.buffer = new LinkedList<>(); + this.serializer = serializer; + + this.version = INIT_VERSION; + this.offset = 0; + + this.userVisibleHead = 0; + this.userVisibleTail = 0; + } + + private T next() { + if (userVisibleHead == userVisibleTail) { + return null; + } + T ret = buffer.removeFirst(); + userVisibleHead++; + + sanityCheck(); + return ret; + } + + private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException { + String responseVersion = response.getVersion(); + long responseLastCheckpointedOffset = response.getLastCheckpointedOffset(); + List<T> results = response.getResults(serializer); + + // we first check version in the response to decide whether we should throw away dirty results + if (!version.equals(responseVersion)) { + // sink restarted, we revert back to where the sink tells us + for (long i = 0; i < offset - responseLastCheckpointedOffset; i++) { + buffer.removeLast(); + } + version = responseVersion; + offset = responseLastCheckpointedOffset; + } + + // we now check if more results can be seen by the user + if (responseLastCheckpointedOffset > userVisibleTail) { + // lastCheckpointedOffset increases, this means that more results have been + // checkpointed, and we can give these results to the user + userVisibleTail = responseLastCheckpointedOffset; + } + + if (!results.isEmpty()) { + // response contains some data, add them to buffer + int addStart = (int) (offset - responseOffset); + List<T> addedResults = results.subList(addStart, results.size()); + buffer.addAll(addedResults); + offset += addedResults.size(); + } + + sanityCheck(); + } + + private void complete() { + userVisibleTail = offset; + } + + private void sanityCheck() { + Preconditions.checkState( + userVisibleHead <= userVisibleTail, + "userVisibleHead should not be larger than userVisibleTail. This is a bug."); + Preconditions.checkState( + userVisibleTail <= offset, + "userVisibleTail should not be larger than offset. This is a bug."); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java new file mode 100644 index 0000000..bbf41e1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.OperatorID; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +/** + * An iterator which iterates through the results of a query job. + * + * <p>NOTE: After using this iterator, the close method MUST be called in order to release job related resources. + */ +public class CollectResultIterator<T> implements Iterator<T>, AutoCloseable { + + private final CollectResultFetcher<T> fetcher; + private T bufferedResult; + + public CollectResultIterator( + CompletableFuture<OperatorID> operatorIdFuture, + TypeSerializer<T> serializer, + String accumulatorName) { + this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName); + this.bufferedResult = null; + } + + @VisibleForTesting + public CollectResultIterator( + CompletableFuture<OperatorID> operatorIdFuture, + TypeSerializer<T> serializer, + String accumulatorName, + int retryMillis) { + this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName, retryMillis); + this.bufferedResult = null; + } + + @Override + public boolean hasNext() { + // we have to make sure that the next result exists + // it is possible that there is no more result but the job is still running + if (bufferedResult == null) { + bufferedResult = nextResultFromFetcher(); + } + return bufferedResult != null; + } + + @Override + public T next() { + if (bufferedResult == null) { + bufferedResult = nextResultFromFetcher(); + } + T ret = bufferedResult; + bufferedResult = null; + return ret; + } + + @Override + public void close() throws Exception { + fetcher.close(); + } + + public void setJobClient(JobClient jobClient) { + fetcher.setJobClient(jobClient); + } + + private T nextResultFromFetcher() { + try { + return fetcher.next(); + } catch (IOException e) { + fetcher.close(); + throw new RuntimeException("Failed to fetch next result", e); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java index 21e889b..64a3ffe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.operators.collect; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; @@ -78,7 +79,9 @@ import java.util.concurrent.locks.ReentrantLock; * before this offset. Sink can safely throw these results away.</li> * <li><strong>lastCheckpointedOffset</strong>: * This is the value of <code>offset</code> when the checkpoint happens. This value will be - * restored from the checkpoint and set back to <code>offset</code> when the sink restarts.</li> + * restored from the checkpoint and set back to <code>offset</code> when the sink restarts. + * Clients who need exactly-once semantics need to rely on this value for the position to + * revert when a failover happens.</li> * </ol> * * <p>Client will put <code>version</code> and <code>offset</code> into the request, indicating that @@ -97,9 +100,9 @@ import java.util.concurrent.locks.ReentrantLock; * <ol> * <li>If the version mismatches, client knows that sink has restarted. It will throw away all uncheckpointed * results after <code>lastCheckpointedOffset</code>.</li> - * <li>Otherwise the version matches. If <code>lastCheckpointedOffset</code> increases, client knows that - * a checkpoint happens. It can now move all results before this offset to a user-visible buffer. If - * the response also contains new results, client will now move these new results into uncheckpointed + * <li>If <code>lastCheckpointedOffset</code> increases, client knows that + * a checkpoint happens. It can now move all results before this offset to a user-visible buffer.</li> + * <li>If the response also contains new results, client will now move these new results into uncheckpointed * buffer.</li> * </ol> * @@ -264,7 +267,9 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che // put results not consumed by the client into the accumulator // so that we do not block the closing procedure while not throwing results away SerializedListAccumulator<byte[]> accumulator = new SerializedListAccumulator<>(); - accumulator.add(serializeAccumulatorResult(), BytePrimitiveArraySerializer.INSTANCE); + accumulator.add( + serializeAccumulatorResult(offset, version, lastCheckpointedOffset, bufferedResults, serializer), + BytePrimitiveArraySerializer.INSTANCE); getRuntimeContext().addAccumulator(accumulatorName, accumulator); } finally { bufferedResultsLock.unlock(); @@ -286,22 +291,28 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che this.eventGateway = eventGateway; } - private byte[] serializeAccumulatorResult() throws IOException { + @VisibleForTesting + public static <T> byte[] serializeAccumulatorResult( + long offset, + String version, + long lastCheckpointedOffset, + List<T> bufferedResults, + TypeSerializer<T> serializer) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); wrapper.writeLong(offset); - CollectCoordinationResponse<IN> finalResponse = + CollectCoordinationResponse<T> finalResponse = new CollectCoordinationResponse<>(version, lastCheckpointedOffset, bufferedResults, serializer); finalResponse.serialize(wrapper); return baos.toByteArray(); } - public static Tuple2<Long, CollectCoordinationResponse> deserializeAccumulatorResult( + public static <T> Tuple2<Long, CollectCoordinationResponse<T>> deserializeAccumulatorResult( byte[] serializedAccResults) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(serializedAccResults); DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(bais); long token = wrapper.readLong(); - CollectCoordinationResponse finalResponse = new CollectCoordinationResponse(wrapper); + CollectCoordinationResponse<T> finalResponse = new CollectCoordinationResponse<>(wrapper); return Tuple2.of(token, finalResponse); } @@ -395,8 +406,8 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che private void close() { running = false; + closeServerSocket(); closeCurrentConnection(); - closeServer(); } private InetSocketAddress getServerSocketAddress() { @@ -444,7 +455,7 @@ public class CollectSinkFunction<IN> extends RichSinkFunction<IN> implements Che } } - private void closeServer() { + private void closeServerSocket() { try { serverSocket.close(); } catch (Exception e) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java index dea9bf3..6c84266 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java @@ -56,6 +56,8 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor private static final Logger LOG = LoggerFactory.getLogger(CollectSinkOperatorCoordinator.class); + private final int socketTimeout; + private InetSocketAddress address; private Socket socket; private DataInputViewStreamWrapper inStream; @@ -63,6 +65,10 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor private ExecutorService executorService; + public CollectSinkOperatorCoordinator(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + @Override public void start() throws Exception { this.executorService = @@ -107,12 +113,20 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor CollectCoordinationRequest request, CompletableFuture<CoordinationResponse> responseFuture, InetSocketAddress sinkAddress) { + if (sinkAddress == null) { + closeConnection(); + completeWithEmptyResponse(request, responseFuture); + return; + } + try { if (socket == null) { - socket = new Socket(sinkAddress.getAddress(), sinkAddress.getPort()); + socket = new Socket(); + socket.setSoTimeout(socketTimeout); socket.setKeepAlive(true); socket.setTcpNoDelay(true); + socket.connect(sinkAddress); inStream = new DataInputViewStreamWrapper(socket.getInputStream()); outStream = new DataOutputViewStreamWrapper(socket.getOutputStream()); LOG.info("Sink connection established"); @@ -200,9 +214,11 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor public static class Provider implements OperatorCoordinator.Provider { private final OperatorID operatorId; + private final int socketTimeout; - public Provider(OperatorID operatorId) { + public Provider(OperatorID operatorId, int socketTimeout) { this.operatorId = operatorId; + this.socketTimeout = socketTimeout; } @Override @@ -213,7 +229,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor @Override public OperatorCoordinator create(Context context) { // we do not send operator event so we don't need a context - return new CollectSinkOperatorCoordinator(); + return new CollectSinkOperatorCoordinator(socketTimeout); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java index 8fa9843..e177f79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.api.operators.collect; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; @@ -28,19 +29,25 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; /** * The Factory class for {@link CollectSinkOperator}. */ -@SuppressWarnings("unchecked") -public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> { +public class CollectSinkOperatorFactory<IN> extends SimpleUdfStreamOperatorFactory<Object> implements CoordinatedOperatorFactory<Object> { private static final long serialVersionUID = 1L; - private final CollectSinkOperator<?> operator; + private final CollectSinkOperator<IN> operator; + private final int socketTimeout; - public CollectSinkOperatorFactory(CollectSinkOperator<?> operator) { - super(operator); - this.operator = operator; + public CollectSinkOperatorFactory( + TypeSerializer<IN> serializer, + int maxResultsPerBatch, + String accumulatorName, + int socketTimeout) { + super(new CollectSinkOperator<>(serializer, maxResultsPerBatch, accumulatorName)); + this.operator = (CollectSinkOperator<IN>) getOperator(); + this.socketTimeout = socketTimeout; } @Override + @SuppressWarnings("unchecked") public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) { final OperatorID operatorId = parameters.getStreamConfig().getOperatorID(); final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher(); @@ -55,6 +62,6 @@ public class CollectSinkOperatorFactory extends SimpleUdfStreamOperatorFactory<O @Override public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) { operator.getOperatorIdFuture().complete(operatorID); - return new CollectSinkOperatorCoordinator.Provider(operatorID); + return new CollectSinkOperatorCoordinator.Provider(operatorID, socketTimeout); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java new file mode 100644 index 0000000..021689f --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectResultIteratorTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.operators.collect.utils.TestCoordinationRequestHandler; +import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient; +import org.apache.flink.util.OptionalFailure; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Tests for {@link CollectResultIterator}. + */ +public class CollectResultIteratorTest extends TestLogger { + + private static final OperatorID TEST_OPERATOR_ID = new OperatorID(); + private static final JobID TEST_JOB_ID = new JobID(); + private static final String ACCUMULATOR_NAME = "accumulatorName"; + + @Test + public void testIteratorWithCheckpointAndFailure() throws Exception { + // run this random test multiple times + for (int testCount = 1000; testCount > 0; testCount--) { + List<Integer> expected = new ArrayList<>(); + for (int i = 0; i < 200; i++) { + expected.add(i); + } + + CollectResultIterator<Integer> iterator = createIteratorAndJobClient(expected, IntSerializer.INSTANCE).f0; + + List<Integer> actual = new ArrayList<>(); + while (iterator.hasNext()) { + actual.add(iterator.next()); + } + Assert.assertEquals(expected.size(), actual.size()); + + Collections.sort(expected); + Collections.sort(actual); + Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0])); + + iterator.close(); + } + } + + @Test + public void testEarlyClose() throws Exception { + List<Integer> expected = new ArrayList<>(); + for (int i = 0; i < 200; i++) { + expected.add(i); + } + + Tuple2<CollectResultIterator<Integer>, JobClient> tuple2 = + createIteratorAndJobClient(expected, IntSerializer.INSTANCE); + CollectResultIterator<Integer> iterator = tuple2.f0; + JobClient jobClient = tuple2.f1; + + for (int i = 0; i < 100; i++) { + Assert.assertTrue(iterator.hasNext()); + Assert.assertNotNull(iterator.next()); + } + Assert.assertTrue(iterator.hasNext()); + iterator.close(); + + Assert.assertEquals(JobStatus.CANCELED, jobClient.getJobStatus().get()); + } + + private Tuple2<CollectResultIterator<Integer>, JobClient> createIteratorAndJobClient( + List<Integer> expected, + TypeSerializer<Integer> serializer) { + CollectResultIterator<Integer> iterator = new CollectResultIterator<>( + CompletableFuture.completedFuture(TEST_OPERATOR_ID), + serializer, + ACCUMULATOR_NAME, + 0); + + TestCoordinationRequestHandler<Integer> handler = new TestCoordinationRequestHandler<>( + expected, serializer, ACCUMULATOR_NAME); + + TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() { + + @Override + public boolean isJobFinished() { + return handler.isClosed(); + } + + @Override + public Map<String, OptionalFailure<Object>> getAccumulatorResults() { + return handler.getAccumulatorResults(); + } + }; + + TestJobClient jobClient = new TestJobClient( + TEST_JOB_ID, + TEST_OPERATOR_ID, + handler, + infoProvider); + iterator.setJobClient(jobClient); + + return Tuple2.of(iterator, jobClient); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java index 1483998..1363d87 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java @@ -17,25 +17,24 @@ package org.apache.flink.streaming.api.operators.collect; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.operators.collect.utils.CollectRequestSender; import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext; import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; -import org.apache.flink.streaming.api.operators.collect.utils.TestCollectClient; +import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; -import org.apache.flink.types.Row; +import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -47,11 +46,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -61,13 +62,16 @@ public class CollectSinkFunctionTest extends TestLogger { private static final int MAX_RESULTS_PER_BATCH = 3; private static final String ACCUMULATOR_NAME = "tableCollectAccumulator"; - private static final long TIME_OUT_MILLIS = 10000; + private static final int FUTURE_TIMEOUT_MILLIS = 10000; + private static final int SOCKET_TIMEOUT_MILLIS = 1000; private static final int MAX_RETIRES = 100; - private static final TypeSerializer<Row> serializer = - new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO).createSerializer(new ExecutionConfig()); + private static final JobID TEST_JOB_ID = new JobID(); + private static final OperatorID TEST_OPERATOR_ID = new OperatorID(); - private CollectSinkFunction<Row> function; + private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE; + + private CollectSinkFunction<Integer> function; private CollectSinkOperatorCoordinator coordinator; private MockFunctionInitializationContext functionInitializationContext; private boolean jobFinished; @@ -81,7 +85,7 @@ public class CollectSinkFunctionTest extends TestLogger { ioManager = new IOManagerAsync(); runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, ioManager); gateway = new MockOperatorEventGateway(); - coordinator = new CollectSinkOperatorCoordinator(); + coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); coordinator.start(); // only used in checkpointed tests @@ -101,10 +105,10 @@ public class CollectSinkFunctionTest extends TestLogger { openFunction(); for (int i = 0; i < 6; i++) { // CollectSinkFunction never use context when invoked - function.invoke(Row.of(i), null); + function.invoke(i, null); } - CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0); + CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0); Assert.assertEquals(0, response.getLastCheckpointedOffset()); String version = response.getVersion(); @@ -118,7 +122,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 0, Collections.emptyList()); for (int i = 6; i < 10; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } // invalid request @@ -135,7 +139,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 0, Collections.emptyList()); for (int i = 10; i < 16; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 12); @@ -151,10 +155,10 @@ public class CollectSinkFunctionTest extends TestLogger { openFunctionWithState(); for (int i = 0; i < 2; i++) { // CollectSinkFunction never use context when invoked - function.invoke(Row.of(i), null); + function.invoke(i, null); } - CollectCoordinationResponse<Row> response = sendRequestAndGetValidResponse("", 0); + CollectCoordinationResponse<Integer> response = sendRequestAndGetValidResponse("", 0); Assert.assertEquals(0, response.getLastCheckpointedOffset()); String version = response.getVersion(); @@ -162,7 +166,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 0, Arrays.asList(0, 1)); for (int i = 2; i < 6; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 3); @@ -181,7 +185,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 3, Arrays.asList(4, 5)); for (int i = 6; i < 9; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 6); @@ -192,7 +196,7 @@ public class CollectSinkFunctionTest extends TestLogger { openFunctionWithState(); for (int i = 9; i < 12; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 4); @@ -208,7 +212,7 @@ public class CollectSinkFunctionTest extends TestLogger { checkpointFunction(2); checkpointComplete(2); - function.invoke(Row.of(12), null); + function.invoke(12, null); response = sendRequestAndGetValidResponse(version, 7); assertResponseEquals(response, version, 6, Arrays.asList(10, 11, 12)); @@ -228,7 +232,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 6, Collections.emptyList()); for (int i = 13; i < 17; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 9); @@ -249,7 +253,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 9, Collections.singletonList(16)); for (int i = 17; i < 20; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 12); @@ -270,7 +274,7 @@ public class CollectSinkFunctionTest extends TestLogger { assertResponseEquals(response, version, 9, Collections.singletonList(16)); for (int i = 20; i < 23; i++) { - function.invoke(Row.of(i), null); + function.invoke(i, null); } response = sendRequestAndGetValidResponse(version, 12); @@ -290,13 +294,9 @@ public class CollectSinkFunctionTest extends TestLogger { expected.add(i); } UncheckpointedDataFeeder feeder = new UncheckpointedDataFeeder(expected); - TestCollectClient<Row> client = new TestCollectClient<>( - serializer, - new TestCollectRequestSender(), - () -> jobFinished); - runFunctionWithClient(feeder, client); - assertResultsEqualAfterSort(expected, client.getResults()); + List<Integer> actual = runFunctionRandomTest(feeder); + assertResultsEqualAfterSort(expected, actual); after(); before(); @@ -312,23 +312,22 @@ public class CollectSinkFunctionTest extends TestLogger { expected.add(i); } CheckpointedDataFeeder feeder = new CheckpointedDataFeeder(expected); - TestCollectClient<Row> client = new TestCollectClient<>( - serializer, - new TestCollectRequestSender(), - () -> jobFinished); - runFunctionWithClient(feeder, client); - assertResultsEqualAfterSort(expected, client.getResults()); + List<Integer> actual = runFunctionRandomTest(feeder); + assertResultsEqualAfterSort(expected, actual); after(); before(); } } - private void runFunctionWithClient(Thread feeder, Thread client) throws Exception { + private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception { + CollectClient client = new CollectClient(); + Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> { feeder.interrupt(); client.interrupt(); + e.printStackTrace(); }; feeder.setUncaughtExceptionHandler(exceptionHandler); client.setUncaughtExceptionHandler(exceptionHandler); @@ -337,10 +336,13 @@ public class CollectSinkFunctionTest extends TestLogger { client.start(); feeder.join(); client.join(); + + return client.results; } private void openFunction() throws Exception { - function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME); + function = new CollectSinkFunction<>( + serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME); function.setRuntimeContext(runtimeContext); function.setOperatorEventGateway(gateway); function.open(new Configuration()); @@ -349,7 +351,8 @@ public class CollectSinkFunctionTest extends TestLogger { private void openFunctionWithState() throws Exception { functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint(); - function = new CollectSinkFunction<>(serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME); + function = new CollectSinkFunction<>( + serializer, MAX_RESULTS_PER_BATCH, ACCUMULATOR_NAME); function.setRuntimeContext(runtimeContext); function.setOperatorEventGateway(gateway); function.initializeState(functionInitializationContext); @@ -382,19 +385,19 @@ public class CollectSinkFunctionTest extends TestLogger { } @SuppressWarnings("unchecked") - private CollectCoordinationResponse<Row> sendRequest( + private CollectCoordinationResponse<Integer> sendRequest( String version, long offset) throws Exception { CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset); // we add a timeout to not block the tests return ((CollectCoordinationResponse) coordinator - .handleCoordinationRequest(request).get(TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)); + .handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); } - private CollectCoordinationResponse<Row> sendRequestAndGetValidResponse( + private CollectCoordinationResponse<Integer> sendRequestAndGetValidResponse( String version, long offset) throws Exception { - CollectCoordinationResponse<Row> response; + CollectCoordinationResponse<Integer> response; for (int i = 0; i < MAX_RETIRES; i++) { response = sendRequest(version, offset); if (response.getLastCheckpointedOffset() >= 0) { @@ -405,7 +408,7 @@ public class CollectSinkFunctionTest extends TestLogger { } @SuppressWarnings("unchecked") - private Tuple2<Long, CollectCoordinationResponse> getAccumualtorResults() throws Exception { + private Tuple2<Long, CollectCoordinationResponse<Integer>> getAccumualtorResults() throws Exception { Accumulator accumulator = runtimeContext.getAccumulator(ACCUMULATOR_NAME); ArrayList<byte[]> accLocalValue = ((SerializedListAccumulator) accumulator).getLocalValue(); List<byte[]> serializedResults = @@ -416,41 +419,35 @@ public class CollectSinkFunctionTest extends TestLogger { } private void assertResponseEquals( - CollectCoordinationResponse<Row> response, + CollectCoordinationResponse<Integer> response, String version, long lastCheckpointedOffset, List<Integer> expected) throws IOException { Assert.assertEquals(version, response.getVersion()); Assert.assertEquals(lastCheckpointedOffset, response.getLastCheckpointedOffset()); - List<Row> results = response.getResults(serializer); + List<Integer> results = response.getResults(serializer); assertResultsEqual(expected, results); } - private void assertResultsEqual(List<Integer> expected, List<Row> actual) { - Assert.assertEquals(expected.size(), actual.size()); - for (int i = 0; i < expected.size(); i++) { - Row row = actual.get(i); - Assert.assertEquals(1, row.getArity()); - Assert.assertEquals(expected.get(i), row.getField(0)); - } + private void assertResultsEqual(List<Integer> expected, List<Integer> actual) { + Assert.assertArrayEquals(expected.toArray(new Integer[0]), actual.toArray(new Integer[0])); } - private void assertResultsEqualAfterSort(List<Integer> expected, List<Row> actual) { + private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) { Collections.sort(expected); - actual.sort(Comparator.comparingInt(row -> (int) row.getField(0))); + Collections.sort(actual); assertResultsEqual(expected, actual); } - @SuppressWarnings("unchecked") private void assertAccumulatorResult( long expectedOffset, String expectedVersion, long expectedLastCheckpointedOffset, List<Integer> expectedResults) throws Exception { - Tuple2<Long, CollectCoordinationResponse> accResults = getAccumualtorResults(); + Tuple2<Long, CollectCoordinationResponse<Integer>> accResults = getAccumualtorResults(); long offset = accResults.f0; - CollectCoordinationResponse response = accResults.f1; - List<Row> actualResults = response.getResults(serializer); + CollectCoordinationResponse<Integer> response = accResults.f1; + List<Integer> actualResults = response.getResults(serializer); Assert.assertEquals(expectedOffset, offset); Assert.assertEquals(expectedVersion, response.getVersion()); @@ -458,19 +455,6 @@ public class CollectSinkFunctionTest extends TestLogger { assertResultsEqual(expectedResults, actualResults); } - private class TestCollectRequestSender implements CollectRequestSender<Row> { - - @Override - public CollectCoordinationResponse<Row> sendRequest(String version, long offset) throws Exception { - return CollectSinkFunctionTest.this.sendRequest(version, offset); - } - - @Override - public Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception { - return CollectSinkFunctionTest.this.getAccumualtorResults(); - } - } - /** * A thread feeding data to the function. It will fail when half of the data is fed. */ @@ -496,7 +480,7 @@ public class CollectSinkFunctionTest extends TestLogger { while (data.size() > 0) { int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1); for (int i = 0; i < size; i++) { - function.invoke(Row.of(data.removeFirst()), null); + function.invoke(data.removeFirst(), null); } if (!failedBefore && data.size() < checkpointedData.size() / 2) { @@ -519,7 +503,6 @@ public class CollectSinkFunctionTest extends TestLogger { finishJob(); } catch (Exception e) { - e.printStackTrace(); throw new RuntimeException(e); } } @@ -571,7 +554,7 @@ public class CollectSinkFunctionTest extends TestLogger { // with 60% chance we add some data int size = Math.min(data.size(), random.nextInt(MAX_RESULTS_PER_BATCH * 3) + 1); for (int i = 0; i < size; i++) { - function.invoke(Row.of(data.removeFirst()), null); + function.invoke(data.removeFirst(), null); } } else if (r < 9) { // with 30% chance we make a checkpoint @@ -603,12 +586,14 @@ public class CollectSinkFunctionTest extends TestLogger { finishJob(); } catch (Exception e) { - e.printStackTrace(); throw new RuntimeException(e); } } } + /** + * Countdown for a checkpoint which will succeed in the future. + */ private static class CheckpointCountdown { private long id; @@ -629,4 +614,71 @@ public class CollectSinkFunctionTest extends TestLogger { return false; } } + + /** + * A thread collecting results with the collecting iterator. + */ + private class CollectClient extends Thread { + + private List<Integer> results; + private CollectResultIterator<Integer> iterator; + + private CollectClient() { + this.results = new ArrayList<>(); + + this.iterator = new CollectResultIterator<>( + CompletableFuture.completedFuture(TEST_OPERATOR_ID), + serializer, + ACCUMULATOR_NAME, + 0 + ); + + TestJobClient.JobInfoProvider infoProvider = new TestJobClient.JobInfoProvider() { + + @Override + public boolean isJobFinished() { + return jobFinished; + } + + @Override + public Map<String, OptionalFailure<Object>> getAccumulatorResults() { + Map<String, OptionalFailure<Object>> accumulatorResults = new HashMap<>(); + accumulatorResults.put( + ACCUMULATOR_NAME, + OptionalFailure.of(runtimeContext.getAccumulator(ACCUMULATOR_NAME).getLocalValue())); + return accumulatorResults; + } + }; + + TestJobClient jobClient = new TestJobClient( + TEST_JOB_ID, + TEST_OPERATOR_ID, + coordinator, + infoProvider); + + iterator.setJobClient(jobClient); + } + + @Override + public void run() { + Random random = new Random(); + + while (iterator.hasNext()) { + results.add(iterator.next()); + if (random.nextBoolean()) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + // ignore + } + } + } + + try { + iterator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java index d2ad57b..66a624a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java @@ -45,6 +45,8 @@ import java.util.concurrent.CompletableFuture; */ public class CollectSinkOperatorCoordinatorTest { + private static final int SOCKET_TIMEOUT_MILLIS = 1000; + private static final TypeSerializer<Row> serializer = new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO @@ -52,7 +54,7 @@ public class CollectSinkOperatorCoordinatorTest { @Test public void testNoAddress() throws Exception { - CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(); + CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); coordinator.start(); CollectCoordinationRequest request = new CollectCoordinationRequest("version", 123); @@ -65,7 +67,7 @@ public class CollectSinkOperatorCoordinatorTest { @Test public void testServerFailure() throws Exception { - CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(); + CollectSinkOperatorCoordinator coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS); coordinator.start(); List<List<Row>> expected = Arrays.asList( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java deleted file mode 100644 index fc7b759..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectRequestSender.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.collect.utils; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse; - -/** - * Testing interface for sending collect requests. - */ -public interface CollectRequestSender<T> { - - CollectCoordinationResponse<T> sendRequest(String version, long offset) throws Exception; - - Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception; -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java deleted file mode 100644 index 9040971..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCollectClient.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.collect.utils; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.function.BooleanSupplier; - -/** - * A simple client for fetching collect results. - */ -public class TestCollectClient<T> extends Thread { - - private static final String INIT_VERSION = ""; - private static final int MAX_RETRY_COUNT = 100; - - private final TypeSerializer<T> serializer; - private final CollectRequestSender<T> sender; - private final BooleanSupplier jobFinishedChecker; - - private final LinkedList<T> uncheckpointedResults; - private final LinkedList<T> checkpointedResults; - - private String version; - private long offset; - private long lastCheckpointedOffset; - private int retryCount; - - public TestCollectClient( - TypeSerializer<T> serializer, - CollectRequestSender<T> sender, - BooleanSupplier jobFinishedChecker) { - this.serializer = serializer; - this.sender = sender; - this.jobFinishedChecker = jobFinishedChecker; - - this.uncheckpointedResults = new LinkedList<>(); - this.checkpointedResults = new LinkedList<>(); - } - - @Override - @SuppressWarnings("unchecked") - public void run() { - Random random = new Random(); - - version = INIT_VERSION; - offset = 0; - lastCheckpointedOffset = 0; - retryCount = 0; - - try { - while (!jobFinishedChecker.getAsBoolean()) { - if (random.nextBoolean()) { - Thread.sleep(random.nextInt(10)); - } - CollectCoordinationResponse<T> response = sender.sendRequest(version, offset); - dealWithResponse(response, offset); - } - - Tuple2<Long, CollectCoordinationResponse> accResults = sender.getAccumulatorResults(); - dealWithResponse(accResults.f1, accResults.f0); - checkpointedResults.addAll(uncheckpointedResults); - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - public List<T> getResults() { - return checkpointedResults; - } - - private void dealWithResponse(CollectCoordinationResponse<T> response, long responseOffset) throws IOException { - String responseVersion = response.getVersion(); - long responseLastCheckpointedOffset = response.getLastCheckpointedOffset(); - List<T> responseResults = response.getResults(serializer); - - if (responseResults.isEmpty()) { - retryCount++; - } else { - retryCount = 0; - } - if (retryCount > MAX_RETRY_COUNT) { - // not to block the tests - throw new RuntimeException("Too many retries in TestCollectClient"); - } - - if (INIT_VERSION.equals(version)) { - // first response, update version accordingly - version = responseVersion; - } else { - if (responseLastCheckpointedOffset > lastCheckpointedOffset) { - // a new checkpoint happens - int newCheckpointedNum = (int) (responseLastCheckpointedOffset - lastCheckpointedOffset); - for (int i = 0; i < newCheckpointedNum; i++) { - T result = uncheckpointedResults.removeFirst(); - checkpointedResults.add(result); - } - lastCheckpointedOffset = responseLastCheckpointedOffset; - } - - if (!version.equals(responseVersion)) { - // sink has restarted - int removeNum = (int) (offset - lastCheckpointedOffset); - for (int i = 0; i < removeNum; i++) { - uncheckpointedResults.removeLast(); - } - version = responseVersion; - offset = lastCheckpointedOffset; - } - - if (responseResults.size() > 0) { - int addStart = (int) (offset - responseOffset); - List<T> resultsToAdd = responseResults.subList(addStart, responseResults.size()); - uncheckpointedResults.addAll(resultsToAdd); - offset += resultsToAdd.size(); - } - } - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java new file mode 100644 index 0000000..aecf864 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestCoordinationRequestHandler.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect.utils; + +import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest; +import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse; +import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction; +import org.apache.flink.util.OptionalFailure; + +import org.junit.Assert; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +/** + * A {@link CoordinationRequestHandler} to test fetching SELECT query results. + */ +public class TestCoordinationRequestHandler<T> implements CoordinationRequestHandler { + + private static final int BATCH_SIZE = 3; + + private final TypeSerializer<T> serializer; + private final String accumulatorName; + + private int checkpointCountDown; + + private LinkedList<T> data; + private List<T> checkpointingData; + private List<T> checkpointedData; + + private LinkedList<T> buffered; + private List<T> checkpointingBuffered; + private List<T> checkpointedBuffered; + + private String version; + + private long offset; + private long checkpointingOffset; + private long checkpointedOffset; + + private Map<String, OptionalFailure<Object>> accumulatorResults; + + private Random random; + private boolean closed; + + public TestCoordinationRequestHandler( + List<T> data, + TypeSerializer<T> serializer, + String accumulatorName) { + this.serializer = serializer; + this.accumulatorName = accumulatorName; + + this.checkpointCountDown = 0; + + this.data = new LinkedList<>(data); + this.checkpointedData = new ArrayList<>(data); + + this.buffered = new LinkedList<>(); + this.checkpointedBuffered = new ArrayList<>(); + + this.version = UUID.randomUUID().toString(); + + this.offset = 0; + this.checkpointingOffset = 0; + this.checkpointedOffset = 0; + + this.accumulatorResults = new HashMap<>(); + + this.random = new Random(); + this.closed = false; + } + + @Override + public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) { + if (closed) { + throw new RuntimeException("Handler closed"); + } + + Assert.assertTrue(request instanceof CollectCoordinationRequest); + CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request; + + for (int i = random.nextInt(3) + 1; i > 0; i--) { + if (checkpointCountDown > 0) { + checkpointCountDown--; + if (checkpointCountDown == 0) { + checkpointedData = checkpointingData; + checkpointedBuffered = checkpointingBuffered; + checkpointedOffset = checkpointingOffset; + } + } + + int r = random.nextInt(10); + if (r < 6) { + // with 60% chance we add data + int size = Math.min(data.size(), BATCH_SIZE * 2 - buffered.size()); + if (size > 0) { + size = random.nextInt(size) + 1; + } + for (int j = 0; j < size; j++) { + buffered.add(data.removeFirst()); + } + + if (data.isEmpty()) { + buildAccumulatorResults(); + closed = true; + break; + } + } else if (r < 9) { + // with 30% chance we do a checkpoint completed in the future + if (checkpointCountDown == 0) { + checkpointCountDown = random.nextInt(5) + 1; + checkpointingData = new ArrayList<>(data); + checkpointingBuffered = new ArrayList<>(buffered); + checkpointingOffset = offset; + } + } else { + // with 10% chance we fail + checkpointCountDown = 0; + version = UUID.randomUUID().toString(); + data = new LinkedList<>(checkpointedData); + buffered = new LinkedList<>(checkpointedBuffered); + offset = checkpointedOffset; + } + } + + Assert.assertTrue(offset <= collectRequest.getOffset()); + + List<T> subList = Collections.emptyList(); + if (collectRequest.getVersion().equals(version)) { + while (buffered.size() > 0 && collectRequest.getOffset() > offset) { + buffered.removeFirst(); + offset++; + } + subList = new ArrayList<>(); + Iterator<T> iterator = buffered.iterator(); + for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) { + subList.add(iterator.next()); + } + } + + CoordinationResponse response; + try { + if (random.nextBoolean()) { + // with 50% chance we return valid result + response = new CollectCoordinationResponse<>(version, checkpointedOffset, subList, serializer); + } else { + // with 50% chance we return invalid result + response = new CollectCoordinationResponse<>( + collectRequest.getVersion(), + -1, + Collections.emptyList(), + serializer); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(response); + } + + public boolean isClosed() { + return closed; + } + + public Map<String, OptionalFailure<Object>> getAccumulatorResults() { + return accumulatorResults; + } + + private void buildAccumulatorResults() { + List<T> finalResults = new ArrayList<>(buffered); + SerializedListAccumulator<byte[]> listAccumulator = new SerializedListAccumulator<>(); + try { + byte[] serializedResult = + CollectSinkFunction.serializeAccumulatorResult( + offset, version, checkpointedOffset, finalResults, serializer); + listAccumulator.add(serializedResult, BytePrimitiveArraySerializer.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + + accumulatorResults.put(accumulatorName, OptionalFailure.of(listAccumulator.getLocalValue())); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java new file mode 100644 index 0000000..3a40870 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/TestJobClient.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.collect.utils; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.util.OptionalFailure; + +import org.junit.Assert; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * A {@link JobClient} to test fetching SELECT query results. + */ +public class TestJobClient implements JobClient, CoordinationRequestGateway { + + private final JobID jobId; + private final OperatorID operatorId; + private final CoordinationRequestHandler handler; + private final JobInfoProvider infoProvider; + + private JobStatus jobStatus; + private JobExecutionResult jobExecutionResult; + + public TestJobClient( + JobID jobId, + OperatorID operatorId, + CoordinationRequestHandler handler, + JobInfoProvider infoProvider) { + this.jobId = jobId; + this.operatorId = operatorId; + this.handler = handler; + this.infoProvider = infoProvider; + + this.jobStatus = JobStatus.RUNNING; + this.jobExecutionResult = null; + } + + @Override + public JobID getJobID() { + return jobId; + } + + @Override + public CompletableFuture<JobStatus> getJobStatus() { + return CompletableFuture.completedFuture(jobStatus); + } + + @Override + public CompletableFuture<Void> cancel() { + jobStatus = JobStatus.CANCELED; + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) { + return CompletableFuture.completedFuture(jobExecutionResult); + } + + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { + if (jobStatus.isGloballyTerminalState()) { + throw new RuntimeException("Job terminated"); + } + + Assert.assertEquals(this.operatorId, operatorId); + CoordinationResponse response; + try { + response = handler.handleCoordinationRequest(request).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (infoProvider.isJobFinished()) { + jobStatus = JobStatus.FINISHED; + jobExecutionResult = new JobExecutionResult(jobId, 0, infoProvider.getAccumulatorResults()); + } + + return CompletableFuture.completedFuture(response); + } + + /** + * Interface to provide job related info for {@link TestJobClient}. + */ + public interface JobInfoProvider { + + boolean isJobFinished(); + + Map<String, OptionalFailure<Object>> getAccumulatorResults(); + } +}