http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java new file mode 100644 index 0000000..cf2c373 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -0,0 +1,1082 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.commons.lang3.time.StopWatch; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.connectors.fs.Writer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.Iterator; + +/** + * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within + * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. + * + * <p> + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * one for each parallel subtask of the sink. These part files contain the actual output data. + * + * <p> + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For example, use the + * {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files + * in a fault-tolerant way. + * + * <p> + * The filenames of the part files contain the part prefix, the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default + * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}. + * When a part file becomes bigger than the user-specified batch size the current part file is closed, + * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB}, + * this can be configured using {@link #setBatchSize(long)}. + * + * <p> + * In some scenarios, the open buckets are required to change based on time. In these cases, the sink + * needs to determine when a bucket has become inactive, in order to flush and close the part file. + * To support this there are two configurable settings: + * <ol> + * <li>the frequency to check for inactive buckets, configured by {@link #setInactiveBucketCheckInterval(long)}, + * and</li> + * <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive, + * configured by {@link #setInactiveBucketThreshold(long)}</li> + * </ol> + * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}. + * + * <p> + * Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}. + * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once + * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once + * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently + * pending files will be moved to {@code finished}. + * + * <p> + * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it + * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} + * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that + * they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports + * the {@code truncate()} method this will be used to reset the file back to its previous state. If not, a special + * file with the same name as the part file and the suffix {@code ".valid-length"} will be created that contains the + * length up to which the file contains valid data. When reading the file, it must be ensured that it is only read up + * to that point. The prefixes and suffixes for the different file states and valid-length files can be configured + * using the adequate setter method, e.g. {@link #setPendingSuffix(String)}. + * + * <p> + * <b>NOTE:</b> + * <ol> + * <li> + * If checkpointing is not enabled the pending files will never be moved to the finished state. In that case, + * the pending suffix/prefix can be set to {@code ""} to make the sink work in a non-fault-tolerant way but + * still provide output without prefixes and suffixes. + * </li> + * <li> + * The part files are written using an instance of {@link Writer}. By default, a + * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result + * of {@code toString()} for every element, separated by newlines. You can configure the writer using the + * {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} + * can be used to write Hadoop {@code SequenceFiles}. + * </li> + * </ol> + * + * <p> + * Example: + * <pre>{@code + * new BucketingSink<Tuple2<IntWritable, Text>>(outPath) + * .setWriter(new SequenceFileWriter<IntWritable, Text>()) + * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm") + * }</pre> + * + * This will create a sink that writes to {@code SequenceFiles} and rolls every minute. + * + * @see DateTimeBucketer + * @see StringWriter + * @see SequenceFileWriter + * + * @param <T> Type of the elements emitted by this sink + */ +public class BucketingSink<T> + extends RichSinkFunction<T> + implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class); + + // -------------------------------------------------------------------------------------------- + // User configuration values + // -------------------------------------------------------------------------------------------- + // These are initialized with some defaults but are meant to be changeable by the user + + /** + * The default maximum size of part files (currently {@code 384 MB}). + */ + private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L; + + /** + * The default time between checks for inactive buckets. By default, {60 sec}. + */ + private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L; + + /** + * The default threshold (in {@code ms}) for marking a bucket as inactive and + * closing its part files. By default, {60 sec}. + */ + private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L; + + /** + * The suffix for {@code in-progress} part files. These are files we are + * currently writing to, but which were not yet confirmed by a checkpoint. + */ + private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress"; + + /** + * The prefix for {@code in-progress} part files. These are files we are + * currently writing to, but which were not yet confirmed by a checkpoint. + */ + private final String DEFAULT_IN_PROGRESS_PREFIX = "_"; + + /** + * The suffix for {@code pending} part files. These are closed files that we are + * not currently writing to (inactive or reached {@link #batchSize}), but which + * were not yet confirmed by a checkpoint. + */ + private final String DEFAULT_PENDING_SUFFIX = ".pending"; + + /** + * The prefix for {@code pending} part files. These are closed files that we are + * not currently writing to (inactive or reached {@link #batchSize}), but which + * were not yet confirmed by a checkpoint. + */ + private final String DEFAULT_PENDING_PREFIX = "_"; + + /** + * When {@code truncate()} is not supported by the used {@link FileSystem}, we create + * a file along the part file with this suffix that contains the length up to which + * the part file is valid. + */ + private final String DEFAULT_VALID_SUFFIX = ".valid-length"; + + /** + * When {@code truncate()} is not supported by the used {@link FileSystem}, we create + * a file along the part file with this preffix that contains the length up to which + * the part file is valid. + */ + private final String DEFAULT_VALID_PREFIX = "_"; + + /** + * The default prefix for part files. + */ + private final String DEFAULT_PART_REFIX = "part"; + + /** + * The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). + */ + private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000; + + + /** + * The base {@code Path} that stores all bucket directories. + */ + private final String basePath; + + /** + * The {@code Bucketer} that is used to determine the path of bucket directories. + */ + private Bucketer<T> bucketer; + + /** + * We have a template and call duplicate() for each parallel writer in open() to get the actual + * writer that is used for the part files. + */ + private Writer<T> writerTemplate; + + private long batchSize = DEFAULT_BATCH_SIZE; + private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS; + private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS; + + // These are the actually configured prefixes/suffixes + private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX; + private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX; + + private String pendingSuffix = DEFAULT_PENDING_SUFFIX; + private String pendingPrefix = DEFAULT_PENDING_PREFIX; + + private String validLengthSuffix = DEFAULT_VALID_SUFFIX; + private String validLengthPrefix= DEFAULT_VALID_PREFIX; + + private String partPrefix = DEFAULT_PART_REFIX; + + /** + * The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). + */ + private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS; + + // -------------------------------------------------------------------------------------------- + // Internal fields (not configurable by user) + // -------------------------------------------§------------------------------------------------- + + /** + * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7 + */ + private transient Method refTruncate; + + /** + * The state object that is handled by Flink from snapshot/restore. This contains state for + * every open bucket: the current in-progress part file path, its valid length and the pending part files. + */ + private transient State<T> state; + + private transient ListState<State<T>> restoredBucketStates; + + /** + * User-defined FileSystem parameters + */ + private Configuration fsConfig; + + /** + * The FileSystem reference. + */ + private transient FileSystem fs; + + private transient Clock clock; + + private transient ProcessingTimeService processingTimeService; + + /** + * Creates a new {@code BucketingSink} that writes files to the given base directory. + * + * <p> + * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer. + * The maximum bucket size is set to 384 MB. + * + * @param basePath The directory to which to write the bucket files. + */ + public BucketingSink(String basePath) { + this.basePath = basePath; + this.bucketer = new DateTimeBucketer<>(); + this.writerTemplate = new StringWriter<>(); + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public BucketingSink<T> setFSConfig(Configuration config) { + this.fsConfig = new Configuration(); + fsConfig.addAll(config); + return this; + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { + this.fsConfig = new Configuration(); + for(Map.Entry<String, String> entry : config) { + fsConfig.setString(entry.getKey(), entry.getValue()); + } + return this; + } + + @Override + @SuppressWarnings("unchecked") + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (this.writerTemplate instanceof InputTypeConfigurable) { + ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); + + initFileSystem(); + + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + OperatorStateStore stateStore = context.getOperatorStateStore(); + restoredBucketStates = stateStore.getSerializableListState("bucket-states"); + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); + + for (State<T> recoveredState : restoredBucketStates.get()) { + handleRestoredBucketState(recoveredState); + if (LOG.isDebugEnabled()) { + LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState); + } + } + } else { + LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); + } + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + state = new State<>(); + + processingTimeService = + ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); + + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); + + processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); + + this.clock = new Clock() { + @Override + public long currentTimeMillis() { + return processingTimeService.getCurrentProcessingTime(); + } + }; + } + + /** + * Create a file system with the user-defined {@code HDFS} configuration. + * @throws IOException + */ + private void initFileSystem() throws IOException { + if (fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if (fsConfig != null) { + String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : fsConfig.keySet()) { + hadoopConf.set(key, fsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + + @Override + public void close() throws Exception { + for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { + closeCurrentPartFile(entry.getValue()); + } + } + + @Override + public void invoke(T value) throws Exception { + Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value); + + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); + + BucketState<T> bucketState = state.getBucketState(bucketPath); + if (bucketState == null) { + bucketState = new BucketState<>(currentProcessingTime); + state.addBucketState(bucketPath, bucketState); + } + + if (shouldRoll(bucketState)) { + openNewPartFile(bucketPath, bucketState); + } + + bucketState.writer.write(value); + bucketState.lastWrittenToTime = currentProcessingTime; + } + + /** + * Returns {@code true} if the current {@code part-file} should be closed and a new should be created. + * This happens if: + * <ol> + * <li>no file is created yet for the task to write to, or</li> + * <li>the current file has reached the maximum bucket size.</li> + * </ol> + */ + private boolean shouldRoll(BucketState<T> bucketState) throws IOException { + boolean shouldRoll = false; + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (!bucketState.isWriterOpen) { + shouldRoll = true; + LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex); + } else { + long writePosition = bucketState.writer.getPos(); + if (writePosition > batchSize) { + shouldRoll = true; + LOG.debug( + "BucketingSink {} starting new bucket because file position {} is above batch size {}.", + subtaskIndex, + writePosition, + batchSize); + } + } + return shouldRoll; + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); + + checkForInactiveBuckets(currentProcessingTime); + + processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); + } + + /** + * Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been + * written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress + * files to be moved to the pending state and be finalised on the next checkpoint. + */ + private void checkForInactiveBuckets(long currentProcessingTime) throws Exception { + + synchronized (state.bucketStates) { + for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { + if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) { + LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.", + getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold); + closeCurrentPartFile(entry.getValue()); + } + } + } + } + + /** + * Closes the current part file and opens a new one with a new bucket path, as returned by the + * {@link Bucketer}. If the bucket is not new, then this will create a new file with the same path + * as its predecessor, but with an increased rolling counter (see {@link BucketingSink}. + */ + private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception { + closeCurrentPartFile(bucketState); + + if (!fs.exists(bucketPath)) { + try { + if (fs.mkdirs(bucketPath)) { + LOG.debug("Created new bucket directory: {}", bucketPath); + } + } catch (IOException e) { + throw new RuntimeException("Could not create new bucket path.", e); + } + } + + // The following loop tries different partCounter values in ascending order until it reaches the minimum + // that is not yet used. This works since there is only one parallel subtask that tries names with this + // subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now + // clean the base directory in case of rescaling. + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); + while (fs.exists(partPath) || + fs.exists(getPendingPathFor(partPath)) || + fs.exists(getInProgressPathFor(partPath))) { + bucketState.partCounter++; + partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); + } + + // increase, so we don't have to check for this name next time + bucketState.partCounter++; + + LOG.debug("Next part path is {}", partPath.toString()); + bucketState.currentFile = partPath.toString(); + + Path inProgressPath = getInProgressPathFor(partPath); + if (bucketState.writer == null) { + bucketState.writer = writerTemplate.duplicate(); + } + + bucketState.writer.open(fs, inProgressPath); + bucketState.isWriterOpen = true; + } + + /** + * Closes the current part file and moves it from the in-progress state to the pending state. + */ + private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception { + if (bucketState.isWriterOpen) { + bucketState.writer.close(); + bucketState.isWriterOpen = false; + } + + if (bucketState.currentFile != null) { + Path currentPartPath = new Path(bucketState.currentFile); + Path inProgressPath = getInProgressPathFor(currentPartPath); + Path pendingPath = getPendingPathFor(currentPartPath); + + fs.rename(inProgressPath, pendingPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", + inProgressPath, + pendingPath); + bucketState.pendingFiles.add(currentPartPath.toString()); + bucketState.currentFile = null; + } + } + + /** + * Gets the truncate() call using reflection. + * <p> + * <b>NOTE:</b> This code comes from Flume. + */ + private Method reflectTruncate(FileSystem fs) { + Method m = null; + if(fs != null) { + Class<?> fsClass = fs.getClass(); + try { + m = fsClass.getMethod("truncate", Path.class, long.class); + } catch (NoSuchMethodException ex) { + LOG.debug("Truncate not found. Will write a file with suffix '{}' " + + " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); + return null; + } + + // verify that truncate actually works + FSDataOutputStream outputStream; + Path testPath = new Path(UUID.randomUUID().toString()); + try { + outputStream = fs.create(testPath); + outputStream.writeUTF("hello"); + outputStream.close(); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works.", e); + } + + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } + + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file.", e); + } + } + return m; + } + + private Path getPendingPathFor(Path path) { + return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); + } + + private Path getInProgressPathFor(Path path) { + return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); + } + + private Path getValidLengthPathFor(Path path) { + return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + synchronized (state.bucketStates) { + + Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator(); + while (bucketStatesIt.hasNext()) { + BucketState<T> bucketState = bucketStatesIt.next().getValue(); + synchronized (bucketState.pendingFilesPerCheckpoint) { + + Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt = + bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); + + while (pendingCheckpointsIt.hasNext()) { + + Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next(); + Long pastCheckpointId = entry.getKey(); + List<String> pendingPaths = entry.getValue(); + + if (pastCheckpointId <= checkpointId) { + LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); + + for (String filename : pendingPaths) { + Path finalPath = new Path(filename); + Path pendingPath = getPendingPathFor(finalPath); + + fs.rename(pendingPath, finalPath); + LOG.debug( + "Moving pending file {} to final location having completed checkpoint {}.", + pendingPath, + pastCheckpointId); + } + pendingCheckpointsIt.remove(); + } + } + + if (!bucketState.isWriterOpen && + bucketState.pendingFiles.isEmpty() && + bucketState.pendingFilesPerCheckpoint.isEmpty()) { + + // We've dealt with all the pending files and the writer for this bucket is not currently open. + // Therefore this bucket is currently inactive and we can remove it from our state. + bucketStatesIt.remove(); + } + } + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); + + restoredBucketStates.clear(); + + synchronized (state.bucketStates) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { + BucketState<T> bucketState = bucketStateEntry.getValue(); + + if (bucketState.isWriterOpen) { + bucketState.currentFileValidLength = bucketState.writer.flush(); + } + + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); + } + bucketState.pendingFiles = new ArrayList<>(); + } + restoredBucketStates.add(state); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state); + } + } + } + + private void handleRestoredBucketState(State<T> restoredState) { + Preconditions.checkNotNull(restoredState); + + for (BucketState<T> bucketState : restoredState.bucketStates.values()) { + + // we can clean all the pending files since they were renamed to + // final files after this checkpoint was successful + // (we re-start from the last **successful** checkpoint) + bucketState.pendingFiles.clear(); + + if (bucketState.currentFile != null) { + + // We were writing to a file when the last checkpoint occurred. This file can either + // be still in-progress or became a pending file at some point after the checkpoint. + // Either way, we have to truncate it back to a valid state (or write a .valid-length + // file that specifies up to which length it is valid) and rename it to the final name + // before starting a new bucket file. + + Path partPath = new Path(bucketState.currentFile); + try { + Path partPendingPath = getPendingPathFor(partPath); + Path partInProgressPath = getInProgressPathFor(partPath); + + if (fs.exists(partPendingPath)) { + LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); + // has been moved to pending in the mean time, rename to final location + fs.rename(partPendingPath, partPath); + } else if (fs.exists(partInProgressPath)) { + LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); + // it was still in progress, rename to final path + fs.rename(partInProgressPath, partPath); + } else if (fs.exists(partPath)) { + LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath); + } else { + LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + + "it was moved to final location by a previous snapshot restore", bucketState.currentFile); + } + + // We use reflection to get the .truncate() method, this + // is only available starting with Hadoop 2.7 + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + // truncate it or write a ".valid-length" file to specify up to which point it is valid + if (refTruncate != null) { + LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); + // some-one else might still hold the lease from a previous try, we are + // recovering, after all ... + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + LOG.debug("Trying to recover file lease {}", partPath); + dfs.recoverLease(partPath); + boolean isclosed = dfs.isFileClosed(partPath); + StopWatch sw = new StopWatch(); + sw.start(); + while (!isclosed) { + if (sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + isclosed = dfs.isFileClosed(partPath); + } + } + Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); + if (!truncated) { + LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); + + // we must wait for the asynchronous truncate operation to complete + StopWatch sw = new StopWatch(); + sw.start(); + long newLen = fs.getFileStatus(partPath).getLen(); + while (newLen != bucketState.currentFileValidLength) { + if (sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + newLen = fs.getFileStatus(partPath).getLen(); + } + if (newLen != bucketState.currentFileValidLength) { + throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); + } + } + } else { + LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); + Path validLengthFilePath = getValidLengthPathFor(partPath); + if (!fs.exists(validLengthFilePath)) { + FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); + lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); + lengthFileOut.close(); + } + } + + // Now that we've restored the bucket to a valid state, reset the current file info + bucketState.currentFile = null; + bucketState.currentFileValidLength = -1; + bucketState.isWriterOpen = false; + } catch (IOException e) { + LOG.error("Error while restoring BucketingSink state.", e); + throw new RuntimeException("Error while restoring BucketingSink state.", e); + } catch (InvocationTargetException | IllegalAccessException e) { + LOG.error("Could not invoke truncate.", e); + throw new RuntimeException("Could not invoke truncate.", e); + } + } + + // Move files that are confirmed by a checkpoint but did not get moved to final location + // because the checkpoint notification did not happen before a failure + + LOG.debug("Moving pending files to final location on restore."); + + Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); + for (Long pastCheckpointId : pastCheckpointIds) { + // All the pending files are buckets that have been completed but are waiting to be renamed + // to their final name + for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { + Path finalPath = new Path(filename); + Path pendingPath = getPendingPathFor(finalPath); + + try { + if (fs.exists(pendingPath)) { + LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); + fs.rename(pendingPath, finalPath); + } + } catch (IOException e) { + LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); + throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e); + } + } + } + + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.clear(); + } + } + } + + // -------------------------------------------------------------------------------------------- + // Setters for User configuration values + // -------------------------------------------------------------------------------------------- + + /** + * Sets the maximum bucket size in bytes. + * + * <p> + * When a bucket part file becomes larger than this size a new bucket part file is started and + * the old one is closed. The name of the bucket files depends on the {@link Bucketer}. + * + * @param batchSize The bucket part file size in bytes. + */ + public BucketingSink<T> setBatchSize(long batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * Sets the default time between checks for inactive buckets. + * + * @param interval The timeout, in milliseconds. + */ + public BucketingSink<T> setInactiveBucketCheckInterval(long interval) { + this.inactiveBucketCheckInterval = interval; + return this; + } + + /** + * Sets the default threshold for marking a bucket as inactive and closing its part files. + * Buckets which haven't been written to for at least this period of time become inactive. + * + * @param threshold The timeout, in milliseconds. + */ + public BucketingSink<T> setInactiveBucketThreshold(long threshold) { + this.inactiveBucketThreshold = threshold; + return this; + } + + /** + * Sets the {@link Bucketer} to use for determining the bucket files to write to. + * + * @param bucketer The bucketer to use. + */ + public BucketingSink<T> setBucketer(Bucketer<T> bucketer) { + this.bucketer = bucketer; + return this; + } + + /** + * Sets the {@link Writer} to be used for writing the incoming elements to bucket files. + * + * @param writer The {@code Writer} to use. + */ + public BucketingSink<T> setWriter(Writer<T> writer) { + this.writerTemplate = writer; + return this; + } + + /** + * Sets the suffix of in-progress part files. The default is {@code "in-progress"}. + */ + public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) { + this.inProgressSuffix = inProgressSuffix; + return this; + } + + /** + * Sets the prefix of in-progress part files. The default is {@code "_"}. + */ + public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) { + this.inProgressPrefix = inProgressPrefix; + return this; + } + + /** + * Sets the suffix of pending part files. The default is {@code ".pending"}. + */ + public BucketingSink<T> setPendingSuffix(String pendingSuffix) { + this.pendingSuffix = pendingSuffix; + return this; + } + + /** + * Sets the prefix of pending part files. The default is {@code "_"}. + */ + public BucketingSink<T> setPendingPrefix(String pendingPrefix) { + this.pendingPrefix = pendingPrefix; + return this; + } + + /** + * Sets the suffix of valid-length files. The default is {@code ".valid-length"}. + */ + public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) { + this.validLengthSuffix = validLengthSuffix; + return this; + } + + /** + * Sets the prefix of valid-length files. The default is {@code "_"}. + */ + public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) { + this.validLengthPrefix = validLengthPrefix; + return this; + } + + /** + * Sets the prefix of part files. The default is {@code "part"}. + */ + public BucketingSink<T> setPartPrefix(String partPrefix) { + this.partPrefix = partPrefix; + return this; + } + + /** + * Disable cleanup of leftover in-progress/pending files when the sink is opened. + * + * <p> + * This should only be disabled if using the sink without checkpoints, to not remove + * the files already in the directory. + * + * @deprecated This option is deprecated and remains only for backwards compatibility. + * We do not clean up lingering files anymore. + */ + @Deprecated + public BucketingSink<T> disableCleanupOnOpen() { + return this; + } + + /** + * Sets the default timeout for asynchronous operations such as recoverLease and truncate. + * + * @param timeout The timeout, in milliseconds. + */ + public BucketingSink<T> setAsyncTimeout(long timeout) { + this.asyncTimeout = timeout; + return this; + } + + // -------------------------------------------------------------------------------------------- + // Internal Classes + // -------------------------------------------------------------------------------------------- + + /** + * This is used during snapshot/restore to keep track of in-progress buckets. + * For each bucket, we maintain a state. + */ + static final class State<T> implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * For every bucket directory (key), we maintain a bucket state (value). + */ + final Map<String, BucketState<T>> bucketStates = new HashMap<>(); + + void addBucketState(Path bucketPath, BucketState<T> state) { + synchronized (bucketStates) { + bucketStates.put(bucketPath.toString(), state); + } + } + + BucketState<T> getBucketState(Path bucketPath) { + synchronized (bucketStates) { + return bucketStates.get(bucketPath.toString()); + } + } + + @Override + public String toString() { + return bucketStates.toString(); + } + } + + /** + * This is used for keeping track of the current in-progress buckets and files that we mark + * for moving from pending to final location after we get a checkpoint-complete notification. + */ + static final class BucketState<T> implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * The file that was in-progress when the last checkpoint occurred. + */ + String currentFile; + + /** + * The valid length of the in-progress file at the time of the last checkpoint. + */ + long currentFileValidLength = -1; + + /** + * The time this bucket was last written to. + */ + long lastWrittenToTime; + + /** + * Pending files that accumulated since the last checkpoint. + */ + List<String> pendingFiles = new ArrayList<>(); + + /** + * When doing a checkpoint we move the pending files since the last checkpoint to this map + * with the id of the checkpoint. When we get the checkpoint-complete notification we move + * pending files of completed checkpoints to their final location. + */ + final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>(); + + /** + * For counting the part files inside a bucket directory. Part files follow the pattern + * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter. + */ + private transient int partCounter; + + /** + * Tracks if the writer is currently opened or closed. + */ + private transient boolean isWriterOpen; + + /** + * The actual writer that we user for writing the part files. + */ + private transient Writer<T> writer; + + @Override + public String toString() { + return + "In-progress=" + currentFile + + " validLength=" + currentFileValidLength + + " pendingForNextCheckpoint=" + pendingFiles + + " pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint + + " lastModified@" + lastWrittenToTime; + } + + BucketState(long lastWrittenToTime) { + this.lastWrittenToTime = lastWrittenToTime; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java new file mode 100644 index 0000000..b985e14 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * A {@link Bucketer} that assigns to buckets based on current system time. + * + * <p> + * The {@code DateTimeBucketer} will create directories of the following form: + * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path + * that was specified as a base path when creating the + * {@link BucketingSink}. The {@code dateTimePath} + * is determined based on the current system time and the user provided format string. + * + * <p> + * {@link SimpleDateFormat} is used to derive a date string from the current system time and + * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling + * files will have a granularity of hours. + * + * + * <p> + * Example: + * + * <pre>{@code + * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH"); + * }</pre> + * + * This will create for example the following bucket path: + * {@code /base/1976-12-31-14/} + * + */ +public class DateTimeBucketer<T> implements Bucketer<T> { + + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; + + private final String formatString; + + private transient SimpleDateFormat dateFormatter; + + /** + * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. + */ + public DateTimeBucketer() { + this(DEFAULT_FORMAT_STRING); + } + + /** + * Creates a new {@code DateTimeBucketer} with the given date/time format string. + * + * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine + * the bucket path. + */ + public DateTimeBucketer(String formatString) { + this.formatString = formatString; + + this.dateFormatter = new SimpleDateFormat(formatString); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + this.dateFormatter = new SimpleDateFormat(formatString); + } + + @Override + public Path getBucketPath(Clock clock, Path basePath, T element) { + String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); + return new Path(basePath + "/" + newDateTimeString); + } + + @Override + public String toString() { + return "DateTimeBucketer{" + + "formatString='" + formatString + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties new file mode 100644 index 0000000..fe60d94 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java new file mode 100644 index 0000000..36c0d03 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import com.google.common.collect.Sets; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; +import org.apache.flink.util.NetUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}. + * + * <p> + * This test only verifies the exactly once behaviour of the sink. Another test tests the + * rolling behaviour. + * + * @deprecated should be removed with the {@link RollingSink}. + */ +@Deprecated +public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase { + + final long NUM_STRINGS = 16_000; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + private static org.apache.hadoop.fs.FileSystem dfs; + + private static String outPath; + + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + + @BeforeClass + public static void createHDFS() throws IOException { + Configuration conf = new Configuration(); + + File dataDir = tempFolder.newFolder(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + + dfs = hdfsCluster.getFileSystem(); + + outPath = "hdfs://" + + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) + + "/string-non-rolling-out"; + } + + @AfterClass + public static void destroyHDFS() { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + } + + @Override + public void testProgram(StreamExecutionEnvironment env) { + assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); + + int PARALLELISM = 12; + + env.enableCheckpointing(20); + env.setParallelism(PARALLELISM); + env.disableOperatorChaining(); + + DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain(); + + DataStream<String> mapped = stream + .map(new OnceFailingIdentityMapper(NUM_STRINGS)); + + RollingSink<String> sink = new RollingSink<String>(outPath) + .setBucketer(new NonRollingBucketer()) + .setBatchSize(10000) + .setValidLengthPrefix("") + .setPendingPrefix("") + .setPendingSuffix(PENDING_SUFFIX) + .setInProgressSuffix(IN_PROGRESS_SUFFIX); + + mapped.addSink(sink); + + } + + @Override + public void postSubmit() throws Exception { + // We read the files and verify that we have read all the strings. If a valid-length + // file exists we only read the file to that point. (This test should work with + // FileSystems that support truncate() and with others as well.) + + Pattern messageRegex = Pattern.compile("message (\\d*)"); + + // Keep a set of the message IDs that we read. The size must equal the read count and + // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some + // elements twice. + Set<Integer> readNumbers = Sets.newHashSet(); + + HashSet<String> uniqMessagesRead = new HashSet<>(); + HashSet<String> messagesInCommittedFiles = new HashSet<>(); + + RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( + outPath), true); + + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + + if (!file.getPath().toString().endsWith(".valid-length")) { + int validLength = (int) file.getLen(); + if (dfs.exists(file.getPath().suffix(".valid-length"))) { + FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length")); + String validLengthString = inStream.readUTF(); + validLength = Integer.parseInt(validLengthString); + System.out.println("VALID LENGTH: " + validLength); + } + FSDataInputStream inStream = dfs.open(file.getPath()); + byte[] buffer = new byte[validLength]; + inStream.readFully(0, buffer, 0, validLength); + inStream.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + + InputStreamReader inStreamReader = new InputStreamReader(bais); + BufferedReader br = new BufferedReader(inStreamReader); + + String line = br.readLine(); + while (line != null) { + Matcher matcher = messageRegex.matcher(line); + if (matcher.matches()) { + uniqMessagesRead.add(line); + + // check that in the committed files there are no duplicates + if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) { + if (!messagesInCommittedFiles.add(line)) { + Assert.fail("Duplicate entry in committed bucket."); + } + } + + int messageId = Integer.parseInt(matcher.group(1)); + readNumbers.add(messageId); + } else { + Assert.fail("Read line does not match expected pattern."); + } + line = br.readLine(); + } + br.close(); + inStreamReader.close(); + bais.close(); + } + } + + // Verify that we read all strings (at-least-once) + Assert.assertEquals(NUM_STRINGS, readNumbers.size()); + + // Verify that we don't have duplicates (boom!, exactly-once) + Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size()); + } + + private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { + private static final long serialVersionUID = 1L; + + private static volatile boolean hasFailed = false; + + private final long numElements; + + private long failurePos; + private long count; + + + OnceFailingIdentityMapper(long numElements) { + this.numElements = numElements; + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws IOException { + long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); + + failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + count = 0; + } + + @Override + public String map(String value) throws Exception { + count++; + if (!hasFailed && count >= failurePos) { + hasFailed = true; + throw new Exception("Test Failure"); + } + + return value; + } + } + + private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> + implements CheckpointedAsynchronously<Integer> { + + private static final long serialVersionUID = 1L; + + private final long numElements; + + private int index; + + private volatile boolean isRunning = true; + + + StringGeneratingSourceFunction(long numElements) { + this.numElements = numElements; + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + final Object lockingObject = ctx.getCheckpointLock(); + + final int step = getRuntimeContext().getNumberOfParallelSubtasks(); + + if (index == 0) { + index = getRuntimeContext().getIndexOfThisSubtask(); + } + + while (isRunning && index < numElements) { + + Thread.sleep(1); + synchronized (lockingObject) { + ctx.collect("message " + index); + index += step; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + private static String randomString(StringBuilder bld, Random rnd) { + final int len = rnd.nextInt(10) + 5; + + for (int i = 0; i < len; i++) { + char next = (char) (rnd.nextInt(20000) + 33); + bld.append(next); + } + + return bld.toString(); + } + + @Override + public Integer snapshotState(long checkpointId, long checkpointTimestamp) { + return index; + } + + @Override + public void restoreState(Integer state) { + index = state; + } + } +}