[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993100#comment-15993100 ]
ASF GitHub Bot commented on FLINK-5969: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r114347034 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java --- @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import java.io.FileOutputStream; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class ContinuousFileProcessingFrom12MigrationTest { + + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 100; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeReaderSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + final OneShotLatch latch = new OneShotLatch(); + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>( + format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + testHarness.open(); + // create some state in the reader + testHarness.processElement(new StreamRecord<>(split1)); + testHarness.processElement(new StreamRecord<>(split2)); + testHarness.processElement(new StreamRecord<>(split3)); + testHarness.processElement(new StreamRecord<>(split4)); + // take a snapshot of the operator's state. This will be used + // to initialize another reader and compare the results of the + // two operators. + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink1.2-snapshot"); + } + + @Test + public void testReaderRestore() throws Exception { + File testFolder = tempFolder.newFolder(); + + final OneShotLatch latch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + testHarness.setup(); + OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename( + "reader-migration-test-flink1.2-snapshot")); + testHarness.initializeState(operatorStateHandles); + testHarness.open(); + + latch.trigger(); + + // ... and wait for the operators to close gracefully + + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + // compare if the results contain what they should contain and also if + // they are the same, as they should. + + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3))); + Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4))); + } + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeMonitoringSourceSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + File path = null; + long fileModTime = Long.MIN_VALUE; + for (int i = 0; i < 1; i++) { + Tuple2<File, String> file = createFileAndFillWithData(testFolder.getAbsolutePath(), "file", i, "This is test line."); + path = file.f0; + fileModTime = path.lastModified(); + } + + TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); + + final ContinuousFileMonitoringFunction<String> monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = + new StreamSource<>(monitoringFunction); + + final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + + testHarness.open(); + + final Throwable[] error = new Throwable[1]; + + final OneShotLatch latch = new OneShotLatch(); + + // run the source asynchronously + Thread runner = new Thread() { + @Override + public void run() { + try { + monitoringFunction.run(new DummySourceContext() { + @Override + public void collect(TimestampedFileInputSplit element) { + latch.trigger(); + } + }); + } + catch (Throwable t) { + t.printStackTrace(); + error[0] = t; + } + } + }; + runner.start(); + + if (!latch.isTriggered()) { + latch.await(); + } + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle( + snapshot, + "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.2-snapshot"); + + monitoringFunction.cancel(); + runner.join(); + + testHarness.close(); + } + + @Test + public void testMonitoringSourceRestore() throws Exception { + + File testFolder = tempFolder.newFolder(); + + Long expectedModTime = Long.parseLong("1493116191000"); + TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); + + final ContinuousFileMonitoringFunction<String> monitoringFunction = + new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); + + StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src = + new StreamSource<>(monitoringFunction); + + final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness = + new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0); + + testHarness.setup(); + OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename( + "monitoring-function-migration-test-1493116191000-flink1.2-snapshot")); + + testHarness.initializeState(operatorStateHandles); + testHarness.open(); + + Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime()); + + } + + private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> { + + private static final long serialVersionUID = -6727603565381560267L; + + private final OneShotLatch latch; + + private FileInputSplit split; + + private boolean reachedEnd; + + BlockingFileInputFormat(OneShotLatch latch, Path filePath) { + super(filePath); + this.latch = latch; + this.reachedEnd = false; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.reachedEnd = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if (!latch.isTriggered()) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return reachedEnd; + } + + @Override + public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { + this.reachedEnd = true; + return split; + } + + @Override + public void close() { + + } + } + + private static abstract class DummySourceContext + implements SourceFunction.SourceContext<TimestampedFileInputSplit> { + + private final Object lock = new Object(); + + @Override + public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) { + } + + @Override + public void emitWatermark(Watermark mark) { + } + + @Override + public Object getCheckpointLock() { + return lock; + } + + @Override + public void close() { + } + } + + /** + * Create a file with pre-determined String format of the form: + * {@code fileIdx +": "+ sampleLine +" "+ lineNo}. + * */ + private Tuple2<File, String> createFileAndFillWithData( + String base, String fileName, int fileIdx, String sampleLine) throws IOException { --- End diff -- the line below has a `.` before fileName, (tmp variable) and I was trying to cover both cases with 1 comment by including `[.]` as some kind of optional part of the name. > Add savepoint backwards compatibility tests from 1.2 to 1.3 > ----------------------------------------------------------- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests > Affects Versions: 1.3.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)