Repository: flink
Updated Branches:
  refs/heads/master 6e40f5901 -> 9bbb8fab3

[hotfix] Delete leftover (superseded) StreamTaskAsyncCheckpointTest

There is RocksDBAsyncSnapshotTest which tests async snapshots for the
RocksDB state backend. Operators themselves cannot do asynchronous
checkpoints right now.


Branch: refs/heads/master
Commit: 9bbb8fab38daff8eb5679e4aa7151f68e0b12226
Parents: 6e40f59
Author: Aljoscha Krettek <>
Authored: Mon Sep 5 12:25:28 2016 +0200
Committer: Aljoscha Krettek <>
Committed: Mon Sep 5 12:26:38 2016 +0200

 .../tasks/    | 234 -------------------
 1 file changed, 234 deletions(-)
diff --git 
deleted file mode 100644
index 66bc237..0000000
+++ /dev/null
@@ -1,234 +0,0 @@
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.flink.streaming.runtime.tasks;
-//import org.apache.flink.api.common.ExecutionConfig;
-//import org.apache.flink.api.common.functions.MapFunction;
-//import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-//import org.apache.flink.core.testutils.OneShotLatch;
-//import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-//import org.apache.flink.streaming.api.graph.StreamConfig;
-//import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-//import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-//import org.apache.flink.streaming.api.watermark.Watermark;
-//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.powermock.core.classloader.annotations.PowerMockIgnore;
-//import org.powermock.core.classloader.annotations.PrepareForTest;
-//import org.powermock.modules.junit4.PowerMockRunner;
-//import java.lang.reflect.Field;
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertTrue;
-// * Tests for asynchronous checkpoints.
-// */
-//@PowerMockIgnore({"*", "com.sun.jndi.*"})
-//public class StreamTaskAsyncCheckpointTest {
-//     /**
-//      * This ensures that asynchronous state handles are actually 
materialized asynchonously.
-//      *
-//      * <p>We use latches to block at various stages and see if the code 
still continues through
-//      * the parts that are not asynchronous. If the checkpoint is not done 
asynchronously the
-//      * test will simply lock forever.
-//      * @throws Exception
-//      */
-//     @Test
-//     public void testAsyncCheckpoints() throws Exception {
-//             final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-//             final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-//             final OneInputStreamTask<String, String> task = new 
-//             final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, 
-//             StreamConfig streamConfig = testHarness.getStreamConfig();
-//             streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-//             StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-//                     testHarness.jobConfig,
-//                     testHarness.taskConfig,
-//                     testHarness.memorySize,
-//                     new MockInputSplitProvider(),
-//                     testHarness.bufferSize) {
-//                     @Override
-//                     public ExecutionConfig getExecutionConfig() {
-//                             return testHarness.executionConfig;
-//                     }
-//                     @Override
-//                     public void acknowledgeCheckpoint(long checkpointId) {
-//                             super.acknowledgeCheckpoint(checkpointId);
-//                     }
-//                     @Override
-//                     public void acknowledgeCheckpoint(long checkpointId, 
StateHandle<?> state) {
-//                             super.acknowledgeCheckpoint(checkpointId, 
-//                             // block on the latch, to verify that 
triggerCheckpoint returns below,
-//                             // even though the async checkpoint would not 
-//                             try {
-//                                     delayCheckpointLatch.await();
-//                             } catch (InterruptedException e) {
-//                                     e.printStackTrace();
-//                             }
-//                             assertTrue(state instanceof 
-//                             StreamTaskStateList stateList = 
(StreamTaskStateList) state;
-//                             // should be only one state
-//                             StreamTaskState taskState = 
-//                             StateHandle<?> operatorState = 
-//                             assertTrue("It must be a TestStateHandle", 
operatorState instanceof TestStateHandle);
-//                             TestStateHandle testState = (TestStateHandle) 
-//                             assertEquals(42, testState.checkpointId);
-//                             assertEquals(17, testState.timestamp);
-//                             // we now know that the checkpoint went through
-//                             ensureCheckpointLatch.trigger();
-//                     }
-//             };
-//             testHarness.invoke(mockEnv);
-//             // wait for the task to be running
-//             for (Field field: StreamTask.class.getDeclaredFields()) {
-//                     if (field.getName().equals("isRunning")) {
-//                             field.setAccessible(true);
-//                             while (!field.getBoolean(task)) {
-//                                     Thread.sleep(10);
-//                             }
-//                     }
-//             }
-//             task.triggerCheckpoint(42, 17);
-//             // now we allow the checkpoint
-//             delayCheckpointLatch.trigger();
-//             // wait for the checkpoint to go through
-//             ensureCheckpointLatch.await();
-//             testHarness.endInput();
-//             testHarness.waitForTaskCompletion();
-//     }
-//     // 
-//     public static class AsyncCheckpointOperator
-//             extends AbstractStreamOperator<String>
-//             implements OneInputStreamOperator<String, String> {
-//             @Override
-//             public void processElement(StreamRecord<String> element) throws 
Exception {
-//                     // we also don't care
-//             }
-//             @Override
-//             public void processWatermark(Watermark mark) throws Exception {
-//                     // not interested
-//             }
-//             @Override
-//             public StreamTaskState snapshotOperatorState(final long 
checkpointId, final long timestamp) throws Exception {
-//                     StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
-//                     AsynchronousStateHandle<String> asyncState =
-//                             new 
DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
-//                     taskState.setOperatorState(asyncState);
-//                     return taskState;
-//             }
-//             @Override
-//             public void restoreState(StreamTaskState taskState) throws 
Exception {
-//                     super.restoreState(taskState);
-//             }
-//     }
-//     private static class DataInputViewAsynchronousStateHandle extends 
AsynchronousStateHandle<String> {
-//             private final long checkpointId;
-//             private final long timestamp;
-//             public DataInputViewAsynchronousStateHandle(long checkpointId, 
long timestamp) {
-//                     this.checkpointId = checkpointId;
-//                     this.timestamp = timestamp;
-//             }
-//             @Override
-//             public StateHandle<String> materialize() throws Exception {
-//                     return new TestStateHandle(checkpointId, timestamp);
-//             }
-//             @Override
-//             public long getStateSize() {
-//                     return 0;
-//             }
-//             @Override
-//             public void close() throws IOException {}
-//     }
-//     private static class TestStateHandle implements StateHandle<String> {
-//             public final long checkpointId;
-//             public final long timestamp;
-//             public TestStateHandle(long checkpointId, long timestamp) {
-//                     this.checkpointId = checkpointId;
-//                     this.timestamp = timestamp;
-//             }
-//             @Override
-//             public String getState(ClassLoader userCodeClassLoader) throws 
Exception {
-//                     return null;
-//             }
-//             @Override
-//             public void discardState() throws Exception {}
-//             @Override
-//             public long getStateSize() {
-//                     return 0;
-//             }
-//             @Override
-//             public void close() throws IOException {}
-//     }
-//     public static class DummyMapFunction<T> implements MapFunction<T, T> {
-//             @Override
-//             public T map(T value) { return value; }
-//     }

Reply via email to