http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
new file mode 100644
index 0000000..cf2c373
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -0,0 +1,1082 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
+import org.apache.flink.streaming.connectors.fs.StringWriter;
+import org.apache.flink.streaming.connectors.fs.Writer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.Iterator;
+
+/**
+ * Sink that emits its input elements to {@link 
org.apache.hadoop.fs.FileSystem} files within
+ * buckets. This is integrated with the checkpointing mechanism to provide 
exactly once semantics.
+ *
+ * <p>
+ * When creating the sink a {@code basePath} must be specified. The base 
directory contains
+ * one directory for every bucket. The bucket directories themselves contain 
several part files,
+ * one for each parallel subtask of the sink. These part files contain the 
actual output data.
+ *
+ * <p>
+ * The sink uses a {@link Bucketer} to determine in which bucket directory 
each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The default 
{@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. You 
can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For 
example, use the
+ * {@link BasePathBucketer} if you don't want to have buckets but still want 
to write part-files
+ * in a fault-tolerant way.
+ *
+ * <p>
+ * The filenames of the part files contain the part prefix, the parallel 
subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} contains 
the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask. Per default
+ * the part prefix is {@code "part"} but this can be configured using {@link 
#setPartPrefix(String)}.
+ * When a part file becomes bigger than the user-specified batch size the 
current part file is closed,
+ * the part counter is increased and a new part file is created. The batch 
size defaults to {@code 384MB},
+ * this can be configured using {@link #setBatchSize(long)}.
+ *
+ * <p>
+ * In some scenarios, the open buckets are required to change based on time. 
In these cases, the sink
+ * needs to determine when a bucket has become inactive, in order to flush and 
close the part file.
+ * To support this there are two configurable settings:
+ * <ol>
+ *     <li>the frequency to check for inactive buckets, configured by {@link 
#setInactiveBucketCheckInterval(long)},
+ *     and</li>
+ *     <li>the minimum amount of time a bucket has to not receive any data 
before it is considered inactive,
+ *     configured by {@link #setInactiveBucketThreshold(long)}</li>
+ * </ol>
+ * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}.
+ *
+ * <p>
+ * Part files can be in one of three states: {@code in-progress}, {@code 
pending} or {@code finished}.
+ * The reason for this is how the sink works together with the checkpointing 
mechanism to provide exactly-once
+ * semantics and fault-tolerance. The part file that is currently being 
written to is {@code in-progress}. Once
+ * a part file is closed for writing it becomes {@code pending}. When a 
checkpoint is successful the currently
+ * pending files will be moved to {@code finished}.
+ *
+ * <p>
+ * If case of a failure, and in order to guarantee exactly-once semantics, the 
sink should roll back to the state it
+ * had when that last successful checkpoint occurred. To this end, when 
restoring, the restored files in {@code pending}
+ * state are transferred into the {@code finished} state while any {@code 
in-progress} files are rolled back, so that
+ * they do not contain data that arrived after the checkpoint from which we 
restore. If the {@code FileSystem} supports
+ * the {@code truncate()} method this will be used to reset the file back to 
its previous state. If not, a special
+ * file with the same name as the part file and the suffix {@code 
".valid-length"} will be created that contains the
+ * length up to which the file contains valid data. When reading the file, it 
must be ensured that it is only read up
+ * to that point. The prefixes and suffixes for the different file states and 
valid-length files can be configured
+ * using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
+ *
+ * <p>
+ * <b>NOTE:</b>
+ * <ol>
+ *     <li>
+ *         If checkpointing is not enabled the pending files will never be 
moved to the finished state. In that case,
+ *         the pending suffix/prefix can be set to {@code ""} to make the sink 
work in a non-fault-tolerant way but
+ *         still provide output without prefixes and suffixes.
+ *     </li>
+ *     <li>
+ *         The part files are written using an instance of {@link Writer}. By 
default, a
+ *         {@link org.apache.flink.streaming.connectors.fs.StringWriter} is 
used, which writes the result
+ *         of {@code toString()} for every element, separated by newlines. You 
can configure the writer using  the
+ *         {@link #setWriter(Writer)}. For example, {@link 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
+ *         can be used to write Hadoop {@code SequenceFiles}.
+ *     </li>
+ * </ol>
+ *
+ * <p>
+ * Example:
+ * <pre>{@code
+ *     new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
+ *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
+ *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
+ * }</pre>
+ *
+ * This will create a sink that writes to {@code SequenceFiles} and rolls 
every minute.
+ *
+ * @see DateTimeBucketer
+ * @see StringWriter
+ * @see SequenceFileWriter
+ *
+ * @param <T> Type of the elements emitted by this sink
+ */
+public class BucketingSink<T>
+               extends RichSinkFunction<T>
+               implements InputTypeConfigurable, CheckpointedFunction, 
CheckpointListener, ProcessingTimeCallback {
+
+       private static final long serialVersionUID = 1L;
+
+       private static Logger LOG = 
LoggerFactory.getLogger(BucketingSink.class);
+
+       // 
--------------------------------------------------------------------------------------------
+       //  User configuration values
+       // 
--------------------------------------------------------------------------------------------
+       // These are initialized with some defaults but are meant to be 
changeable by the user
+
+       /**
+        * The default maximum size of part files (currently {@code 384 MB}).
+        */
+       private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
+
+       /**
+        * The default time between checks for inactive buckets. By default, 
{60 sec}.
+        */
+       private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 
1000L;
+
+       /**
+        * The default threshold (in {@code ms}) for marking a bucket as 
inactive and
+        * closing its part files. By default, {60 sec}.
+        */
+       private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
+
+       /**
+        * The suffix for {@code in-progress} part files. These are files we are
+        * currently writing to, but which were not yet confirmed by a 
checkpoint.
+        */
+       private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
+
+       /**
+        * The prefix for {@code in-progress} part files. These are files we are
+        * currently writing to, but which were not yet confirmed by a 
checkpoint.
+        */
+       private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
+
+       /**
+        * The suffix for {@code pending} part files. These are closed files 
that we are
+        * not currently writing to (inactive or reached {@link #batchSize}), 
but which
+        * were not yet confirmed by a checkpoint.
+        */
+       private final String DEFAULT_PENDING_SUFFIX = ".pending";
+
+       /**
+        * The prefix for {@code pending} part files. These are closed files 
that we are
+        * not currently writing to (inactive or reached {@link #batchSize}), 
but which
+        * were not yet confirmed by a checkpoint.
+        */
+       private final String DEFAULT_PENDING_PREFIX = "_";
+
+       /**
+        * When {@code truncate()} is not supported by the used {@link 
FileSystem}, we create
+        * a file along the part file with this suffix that contains the length 
up to which
+        * the part file is valid.
+        */
+       private final String DEFAULT_VALID_SUFFIX = ".valid-length";
+
+       /**
+        * When {@code truncate()} is not supported by the used {@link 
FileSystem}, we create
+        * a file along the part file with this preffix that contains the 
length up to which
+        * the part file is valid.
+        */
+       private final String DEFAULT_VALID_PREFIX = "_";
+
+       /**
+        * The default prefix for part files.
+        */
+       private final String DEFAULT_PART_REFIX = "part";
+
+       /**
+        * The default timeout for asynchronous operations such as recoverLease 
and truncate (in {@code ms}).
+        */
+       private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
+
+
+       /**
+        * The base {@code Path} that stores all bucket directories.
+        */
+       private final String basePath;
+
+       /**
+        * The {@code Bucketer} that is used to determine the path of bucket 
directories.
+        */
+       private Bucketer<T> bucketer;
+
+       /**
+        * We have a template and call duplicate() for each parallel writer in 
open() to get the actual
+        * writer that is used for the part files.
+        */
+       private Writer<T> writerTemplate;
+
+       private long batchSize = DEFAULT_BATCH_SIZE;
+       private long inactiveBucketCheckInterval = 
DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
+       private long inactiveBucketThreshold = 
DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
+
+       // These are the actually configured prefixes/suffixes
+       private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
+       private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
+
+       private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
+       private String pendingPrefix = DEFAULT_PENDING_PREFIX;
+
+       private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
+       private String validLengthPrefix= DEFAULT_VALID_PREFIX;
+
+       private String partPrefix = DEFAULT_PART_REFIX;
+
+       /**
+        * The timeout for asynchronous operations such as recoverLease and 
truncate (in {@code ms}).
+        */
+       private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Internal fields (not configurable by user)
+       // 
-------------------------------------------§-------------------------------------------------
+
+       /**
+        * We use reflection to get the .truncate() method, this is only 
available starting with Hadoop 2.7
+        */
+       private transient Method refTruncate;
+
+       /**
+        * The state object that is handled by Flink from snapshot/restore. 
This contains state for
+        * every open bucket: the current in-progress part file path, its valid 
length and the pending part files.
+        */
+       private transient State<T> state;
+
+       private transient ListState<State<T>> restoredBucketStates;
+
+       /**
+        * User-defined FileSystem parameters
+        */
+       private Configuration fsConfig;
+
+       /**
+        * The FileSystem reference.
+        */
+       private transient FileSystem fs;
+
+       private transient Clock clock;
+
+       private transient ProcessingTimeService processingTimeService;
+
+       /**
+        * Creates a new {@code BucketingSink} that writes files to the given 
base directory.
+        *
+        * <p>
+        * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link 
StringWriter} has writer.
+        * The maximum bucket size is set to 384 MB.
+        *
+        * @param basePath The directory to which to write the bucket files.
+        */
+       public BucketingSink(String basePath) {
+               this.basePath = basePath;
+               this.bucketer = new DateTimeBucketer<>();
+               this.writerTemplate = new StringWriter<>();
+       }
+
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public BucketingSink<T> setFSConfig(Configuration config) {
+               this.fsConfig = new Configuration();
+               fsConfig.addAll(config);
+               return this;
+       }
+
+       /**
+        * Specify a custom {@code Configuration} that will be used when 
creating
+        * the {@link FileSystem} for writing.
+        */
+       public BucketingSink<T> 
setFSConfig(org.apache.hadoop.conf.Configuration config) {
+               this.fsConfig = new Configuration();
+               for(Map.Entry<String, String> entry : config) {
+                       fsConfig.setString(entry.getKey(), entry.getValue());
+               }
+               return this;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               if (this.writerTemplate instanceof InputTypeConfigurable) {
+                       ((InputTypeConfigurable) 
writerTemplate).setInputType(type, executionConfig);
+               }
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               Preconditions.checkArgument(this.restoredBucketStates == null, 
"The operator has already been initialized.");
+
+               initFileSystem();
+
+               if (this.refTruncate == null) {
+                       this.refTruncate = reflectTruncate(fs);
+               }
+
+               OperatorStateStore stateStore = context.getOperatorStateStore();
+               restoredBucketStates = 
stateStore.getSerializableListState("bucket-states");
+
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIndex);
+
+                       for (State<T> recoveredState : 
restoredBucketStates.get()) {
+                               handleRestoredBucketState(recoveredState);
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("{} idx {} restored {}", 
getClass().getSimpleName(), subtaskIndex, recoveredState);
+                               }
+                       }
+               } else {
+                       LOG.info("No state to restore for the {} 
(taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
+               }
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+
+               state = new State<>();
+
+               processingTimeService =
+                               ((StreamingRuntimeContext) 
getRuntimeContext()).getProcessingTimeService();
+
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
+
+               processingTimeService.registerTimer(currentProcessingTime + 
inactiveBucketCheckInterval, this);
+
+               this.clock = new Clock() {
+                       @Override
+                       public long currentTimeMillis() {
+                               return 
processingTimeService.getCurrentProcessingTime();
+                       }
+               };
+       }
+
+       /**
+        * Create a file system with the user-defined {@code HDFS} 
configuration.
+        * @throws IOException
+        */
+       private void initFileSystem() throws IOException {
+               if (fs != null) {
+                       return;
+               }
+               org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopFileSystem.getHadoopConfiguration();
+               if (fsConfig != null) {
+                       String disableCacheName = 
String.format("fs.%s.impl.disable.cache", new 
Path(basePath).toUri().getScheme());
+                       hadoopConf.setBoolean(disableCacheName, true);
+                       for (String key : fsConfig.keySet()) {
+                               hadoopConf.set(key, fsConfig.getString(key, 
null));
+                       }
+               }
+
+               fs = new Path(basePath).getFileSystem(hadoopConf);
+       }
+
+       @Override
+       public void close() throws Exception {
+               for (Map.Entry<String, BucketState<T>> entry : 
state.bucketStates.entrySet()) {
+                       closeCurrentPartFile(entry.getValue());
+               }
+       }
+
+       @Override
+       public void invoke(T value) throws Exception {
+               Path bucketPath = bucketer.getBucketPath(clock, new 
Path(basePath), value);
+
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
+
+               BucketState<T> bucketState = state.getBucketState(bucketPath);
+               if (bucketState == null) {
+                       bucketState = new BucketState<>(currentProcessingTime);
+                       state.addBucketState(bucketPath, bucketState);
+               }
+
+               if (shouldRoll(bucketState)) {
+                       openNewPartFile(bucketPath, bucketState);
+               }
+
+               bucketState.writer.write(value);
+               bucketState.lastWrittenToTime = currentProcessingTime;
+       }
+
+       /**
+        * Returns {@code true} if the current {@code part-file} should be 
closed and a new should be created.
+        * This happens if:
+        * <ol>
+        *     <li>no file is created yet for the task to write to, or</li>
+        *     <li>the current file has reached the maximum bucket size.</li>
+        * </ol>
+        */
+       private boolean shouldRoll(BucketState<T> bucketState) throws 
IOException {
+               boolean shouldRoll = false;
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+               if (!bucketState.isWriterOpen) {
+                       shouldRoll = true;
+                       LOG.debug("BucketingSink {} starting new bucket.", 
subtaskIndex);
+               } else {
+                       long writePosition = bucketState.writer.getPos();
+                       if (writePosition > batchSize) {
+                               shouldRoll = true;
+                               LOG.debug(
+                                       "BucketingSink {} starting new bucket 
because file position {} is above batch size {}.",
+                                       subtaskIndex,
+                                       writePosition,
+                                       batchSize);
+                       }
+               }
+               return shouldRoll;
+       }
+
+       @Override
+       public void onProcessingTime(long timestamp) throws Exception {
+               long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
+
+               checkForInactiveBuckets(currentProcessingTime);
+
+               processingTimeService.registerTimer(currentProcessingTime + 
inactiveBucketCheckInterval, this);
+       }
+
+       /**
+        * Checks for inactive buckets, and closes them. Buckets are considered 
inactive if they have not been
+        * written to for a period greater than {@code inactiveBucketThreshold} 
ms. This enables in-progress
+        * files to be moved to the pending state and be finalised on the next 
checkpoint.
+        */
+       private void checkForInactiveBuckets(long currentProcessingTime) throws 
Exception {
+
+               synchronized (state.bucketStates) {
+                       for (Map.Entry<String, BucketState<T>> entry : 
state.bucketStates.entrySet()) {
+                               if (entry.getValue().lastWrittenToTime < 
currentProcessingTime - inactiveBucketThreshold) {
+                                       LOG.debug("BucketingSink {} closing 
bucket due to inactivity of over {} ms.",
+                                               
getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
+                                       closeCurrentPartFile(entry.getValue());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Closes the current part file and opens a new one with a new bucket 
path, as returned by the
+        * {@link Bucketer}. If the bucket is not new, then this will create a 
new file with the same path
+        * as its predecessor, but with an increased rolling counter (see 
{@link BucketingSink}.
+        */
+       private void openNewPartFile(Path bucketPath, BucketState<T> 
bucketState) throws Exception {
+               closeCurrentPartFile(bucketState);
+
+               if (!fs.exists(bucketPath)) {
+                       try {
+                               if (fs.mkdirs(bucketPath)) {
+                                       LOG.debug("Created new bucket 
directory: {}", bucketPath);
+                               }
+                       } catch (IOException e) {
+                               throw new RuntimeException("Could not create 
new bucket path.", e);
+                       }
+               }
+
+               // The following loop tries different partCounter values in 
ascending order until it reaches the minimum
+               // that is not yet used. This works since there is only one 
parallel subtask that tries names with this
+               // subtask id. Otherwise we would run into concurrency issues 
here. This is aligned with the way we now
+               // clean the base directory in case of rescaling.
+
+               int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+               Path partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
+               while (fs.exists(partPath) ||
+                               fs.exists(getPendingPathFor(partPath)) ||
+                               fs.exists(getInProgressPathFor(partPath))) {
+                       bucketState.partCounter++;
+                       partPath = new Path(bucketPath, partPrefix + "-" + 
subtaskIndex + "-" + bucketState.partCounter);
+               }
+
+               // increase, so we don't have to check for this name next time
+               bucketState.partCounter++;
+
+               LOG.debug("Next part path is {}", partPath.toString());
+               bucketState.currentFile = partPath.toString();
+
+               Path inProgressPath = getInProgressPathFor(partPath);
+               if (bucketState.writer == null) {
+                       bucketState.writer = writerTemplate.duplicate();
+               }
+
+               bucketState.writer.open(fs, inProgressPath);
+               bucketState.isWriterOpen = true;
+       }
+
+       /**
+        * Closes the current part file and moves it from the in-progress state 
to the pending state.
+        */
+       private void closeCurrentPartFile(BucketState<T> bucketState) throws 
Exception {
+               if (bucketState.isWriterOpen) {
+                       bucketState.writer.close();
+                       bucketState.isWriterOpen = false;
+               }
+
+               if (bucketState.currentFile != null) {
+                       Path currentPartPath = new 
Path(bucketState.currentFile);
+                       Path inProgressPath = 
getInProgressPathFor(currentPartPath);
+                       Path pendingPath = getPendingPathFor(currentPartPath);
+
+                       fs.rename(inProgressPath, pendingPath);
+                       LOG.debug("Moving in-progress bucket {} to pending file 
{}",
+                               inProgressPath,
+                               pendingPath);
+                       
bucketState.pendingFiles.add(currentPartPath.toString());
+                       bucketState.currentFile = null;
+               }
+       }
+
+       /**
+        * Gets the truncate() call using reflection.
+        * <p>
+        * <b>NOTE:</b> This code comes from Flume.
+        */
+       private Method reflectTruncate(FileSystem fs) {
+               Method m = null;
+               if(fs != null) {
+                       Class<?> fsClass = fs.getClass();
+                       try {
+                               m = fsClass.getMethod("truncate", Path.class, 
long.class);
+                       } catch (NoSuchMethodException ex) {
+                               LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
+                                       " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
+                               return null;
+                       }
+
+                       // verify that truncate actually works
+                       FSDataOutputStream outputStream;
+                       Path testPath = new Path(UUID.randomUUID().toString());
+                       try {
+                               outputStream = fs.create(testPath);
+                               outputStream.writeUTF("hello");
+                               outputStream.close();
+                       } catch (IOException e) {
+                               LOG.error("Could not create file for checking 
if truncate works.", e);
+                               throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
+                       }
+
+                       try {
+                               m.invoke(fs, testPath, 2);
+                       } catch (IllegalAccessException | 
InvocationTargetException e) {
+                               LOG.debug("Truncate is not supported.", e);
+                               m = null;
+                       }
+
+                       try {
+                               fs.delete(testPath, false);
+                       } catch (IOException e) {
+                               LOG.error("Could not delete truncate test 
file.", e);
+                               throw new RuntimeException("Could not delete 
truncate test file.", e);
+                       }
+               }
+               return m;
+       }
+
+       private Path getPendingPathFor(Path path) {
+               return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
+       }
+
+       private Path getInProgressPathFor(Path path) {
+               return new Path(path.getParent(), inProgressPrefix + 
path.getName()).suffix(inProgressSuffix);
+       }
+
+       private Path getValidLengthPathFor(Path path) {
+               return new Path(path.getParent(), validLengthPrefix + 
path.getName()).suffix(validLengthSuffix);
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               synchronized (state.bucketStates) {
+
+                       Iterator<Map.Entry<String, BucketState<T>>> 
bucketStatesIt = state.bucketStates.entrySet().iterator();
+                       while (bucketStatesIt.hasNext()) {
+                               BucketState<T> bucketState = 
bucketStatesIt.next().getValue();
+                               synchronized 
(bucketState.pendingFilesPerCheckpoint) {
+
+                                       Iterator<Map.Entry<Long, List<String>>> 
pendingCheckpointsIt =
+                                               
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
+
+                                       while (pendingCheckpointsIt.hasNext()) {
+
+                                               Map.Entry<Long, List<String>> 
entry = pendingCheckpointsIt.next();
+                                               Long pastCheckpointId = 
entry.getKey();
+                                               List<String> pendingPaths = 
entry.getValue();
+
+                                               if (pastCheckpointId <= 
checkpointId) {
+                                                       LOG.debug("Moving 
pending files to final location for checkpoint {}", pastCheckpointId);
+
+                                                       for (String filename : 
pendingPaths) {
+                                                               Path finalPath 
= new Path(filename);
+                                                               Path 
pendingPath = getPendingPathFor(finalPath);
+
+                                                               
fs.rename(pendingPath, finalPath);
+                                                               LOG.debug(
+                                                                       "Moving 
pending file {} to final location having completed checkpoint {}.",
+                                                                       
pendingPath,
+                                                                       
pastCheckpointId);
+                                                       }
+                                                       
pendingCheckpointsIt.remove();
+                                               }
+                                       }
+
+                                       if (!bucketState.isWriterOpen &&
+                                               
bucketState.pendingFiles.isEmpty() &&
+                                               
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
+
+                                               // We've dealt with all the 
pending files and the writer for this bucket is not currently open.
+                                               // Therefore this bucket is 
currently inactive and we can remove it from our state.
+                                               bucketStatesIt.remove();
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               Preconditions.checkNotNull(restoredBucketStates, "The operator 
has not been properly initialized.");
+
+               restoredBucketStates.clear();
+
+               synchronized (state.bucketStates) {
+                       int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
+
+                       for (Map.Entry<String, BucketState<T>> bucketStateEntry 
: state.bucketStates.entrySet()) {
+                               BucketState<T> bucketState = 
bucketStateEntry.getValue();
+
+                               if (bucketState.isWriterOpen) {
+                                       bucketState.currentFileValidLength = 
bucketState.writer.flush();
+                               }
+
+                               synchronized 
(bucketState.pendingFilesPerCheckpoint) {
+                                       
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
bucketState.pendingFiles);
+                               }
+                               bucketState.pendingFiles = new ArrayList<>();
+                       }
+                       restoredBucketStates.add(state);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("{} idx {} checkpointed {}.", 
getClass().getSimpleName(), subtaskIdx, state);
+                       }
+               }
+       }
+
+       private void handleRestoredBucketState(State<T> restoredState) {
+               Preconditions.checkNotNull(restoredState);
+
+               for (BucketState<T> bucketState : 
restoredState.bucketStates.values()) {
+
+                       // we can clean all the pending files since they were 
renamed to
+                       // final files after this checkpoint was successful
+                       // (we re-start from the last **successful** checkpoint)
+                       bucketState.pendingFiles.clear();
+
+                       if (bucketState.currentFile != null) {
+
+                               // We were writing to a file when the last 
checkpoint occurred. This file can either
+                               // be still in-progress or became a pending 
file at some point after the checkpoint.
+                               // Either way, we have to truncate it back to a 
valid state (or write a .valid-length
+                               // file that specifies up to which length it is 
valid) and rename it to the final name
+                               // before starting a new bucket file.
+
+                               Path partPath = new 
Path(bucketState.currentFile);
+                               try {
+                                       Path partPendingPath = 
getPendingPathFor(partPath);
+                                       Path partInProgressPath = 
getInProgressPathFor(partPath);
+
+                                       if (fs.exists(partPendingPath)) {
+                                               LOG.debug("In-progress file {} 
has been moved to pending after checkpoint, moving to final location.", 
partPath);
+                                               // has been moved to pending in 
the mean time, rename to final location
+                                               fs.rename(partPendingPath, 
partPath);
+                                       } else if 
(fs.exists(partInProgressPath)) {
+                                               LOG.debug("In-progress file {} 
is still in-progress, moving to final location.", partPath);
+                                               // it was still in progress, 
rename to final path
+                                               fs.rename(partInProgressPath, 
partPath);
+                                       } else if (fs.exists(partPath)) {
+                                               LOG.debug("In-Progress file {} 
was already moved to final location {}.", bucketState.currentFile, partPath);
+                                       } else {
+                                               LOG.debug("In-Progress file {} 
was neither moved to pending nor is still in progress. Possibly, " +
+                                                       "it was moved to final 
location by a previous snapshot restore", bucketState.currentFile);
+                                       }
+
+                                       // We use reflection to get the 
.truncate() method, this
+                                       // is only available starting with 
Hadoop 2.7
+                                       if (this.refTruncate == null) {
+                                               this.refTruncate = 
reflectTruncate(fs);
+                                       }
+
+                                       // truncate it or write a 
".valid-length" file to specify up to which point it is valid
+                                       if (refTruncate != null) {
+                                               LOG.debug("Truncating {} to 
valid length {}", partPath, bucketState.currentFileValidLength);
+                                               // some-one else might still 
hold the lease from a previous try, we are
+                                               // recovering, after all ...
+                                               if (fs instanceof 
DistributedFileSystem) {
+                                                       DistributedFileSystem 
dfs = (DistributedFileSystem) fs;
+                                                       LOG.debug("Trying to 
recover file lease {}", partPath);
+                                                       
dfs.recoverLease(partPath);
+                                                       boolean isclosed = 
dfs.isFileClosed(partPath);
+                                                       StopWatch sw = new 
StopWatch();
+                                                       sw.start();
+                                                       while (!isclosed) {
+                                                               if 
(sw.getTime() > asyncTimeout) {
+                                                                       break;
+                                                               }
+                                                               try {
+                                                                       
Thread.sleep(500);
+                                                               } catch 
(InterruptedException e1) {
+                                                                       // 
ignore it
+                                                               }
+                                                               isclosed = 
dfs.isFileClosed(partPath);
+                                                       }
+                                               }
+                                               Boolean truncated = (Boolean) 
refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
+                                               if (!truncated) {
+                                                       LOG.debug("Truncate did 
not immediately complete for {}, waiting...", partPath);
+
+                                                       // we must wait for the 
asynchronous truncate operation to complete
+                                                       StopWatch sw = new 
StopWatch();
+                                                       sw.start();
+                                                       long newLen = 
fs.getFileStatus(partPath).getLen();
+                                                       while (newLen != 
bucketState.currentFileValidLength) {
+                                                               if 
(sw.getTime() > asyncTimeout) {
+                                                                       break;
+                                                               }
+                                                               try {
+                                                                       
Thread.sleep(500);
+                                                               } catch 
(InterruptedException e1) {
+                                                                       // 
ignore it
+                                                               }
+                                                               newLen = 
fs.getFileStatus(partPath).getLen();
+                                                       }
+                                                       if (newLen != 
bucketState.currentFileValidLength) {
+                                                               throw new 
RuntimeException("Truncate did not truncate to right length. Should be " + 
bucketState.currentFileValidLength + " is " + newLen + ".");
+                                                       }
+                                               }
+                                       } else {
+                                               LOG.debug("Writing valid-length 
file for {} to specify valid length {}", partPath, 
bucketState.currentFileValidLength);
+                                               Path validLengthFilePath = 
getValidLengthPathFor(partPath);
+                                               if 
(!fs.exists(validLengthFilePath)) {
+                                                       FSDataOutputStream 
lengthFileOut = fs.create(validLengthFilePath);
+                                                       
lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+                                                       lengthFileOut.close();
+                                               }
+                                       }
+
+                                       // Now that we've restored the bucket 
to a valid state, reset the current file info
+                                       bucketState.currentFile = null;
+                                       bucketState.currentFileValidLength = -1;
+                                       bucketState.isWriterOpen = false;
+                               } catch (IOException e) {
+                                       LOG.error("Error while restoring 
BucketingSink state.", e);
+                                       throw new RuntimeException("Error while 
restoring BucketingSink state.", e);
+                               } catch (InvocationTargetException | 
IllegalAccessException e) {
+                                       LOG.error("Could not invoke truncate.", 
e);
+                                       throw new RuntimeException("Could not 
invoke truncate.", e);
+                               }
+                       }
+
+                       // Move files that are confirmed by a checkpoint but 
did not get moved to final location
+                       // because the checkpoint notification did not happen 
before a failure
+
+                       LOG.debug("Moving pending files to final location on 
restore.");
+
+                       Set<Long> pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
+                       for (Long pastCheckpointId : pastCheckpointIds) {
+                               // All the pending files are buckets that have 
been completed but are waiting to be renamed
+                               // to their final name
+                               for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+                                       Path finalPath = new Path(filename);
+                                       Path pendingPath = 
getPendingPathFor(finalPath);
+
+                                       try {
+                                               if (fs.exists(pendingPath)) {
+                                                       LOG.debug("Restoring 
BucketingSink State: Moving pending file {} to final location after complete 
checkpoint {}.", pendingPath, pastCheckpointId);
+                                                       fs.rename(pendingPath, 
finalPath);
+                                               }
+                                       } catch (IOException e) {
+                                               LOG.error("Restoring 
BucketingSink State: Error while renaming pending file {} to final path {}: 
{}", pendingPath, finalPath, e);
+                                               throw new 
RuntimeException("Error while renaming pending file " + pendingPath + " to 
final path " + finalPath, e);
+                                       }
+                               }
+                       }
+
+                       synchronized (bucketState.pendingFilesPerCheckpoint) {
+                               bucketState.pendingFilesPerCheckpoint.clear();
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Setters for User configuration values
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Sets the maximum bucket size in bytes.
+        *
+        * <p>
+        * When a bucket part file becomes larger than this size a new bucket 
part file is started and
+        * the old one is closed. The name of the bucket files depends on the 
{@link Bucketer}.
+        *
+        * @param batchSize The bucket part file size in bytes.
+        */
+       public BucketingSink<T> setBatchSize(long batchSize) {
+               this.batchSize = batchSize;
+               return this;
+       }
+
+       /**
+        * Sets the default time between checks for inactive buckets.
+        *
+        * @param interval The timeout, in milliseconds.
+        */
+       public BucketingSink<T> setInactiveBucketCheckInterval(long interval) {
+               this.inactiveBucketCheckInterval = interval;
+               return this;
+       }
+
+       /**
+        * Sets the default threshold for marking a bucket as inactive and 
closing its part files.
+        * Buckets which haven't been written to for at least this period of 
time become inactive.
+        *
+        * @param threshold The timeout, in milliseconds.
+        */
+       public BucketingSink<T> setInactiveBucketThreshold(long threshold) {
+               this.inactiveBucketThreshold = threshold;
+               return this;
+       }
+
+       /**
+        * Sets the {@link Bucketer} to use for determining the bucket files to 
write to.
+        *
+        * @param bucketer The bucketer to use.
+        */
+       public BucketingSink<T> setBucketer(Bucketer<T> bucketer) {
+               this.bucketer = bucketer;
+               return this;
+       }
+
+       /**
+        * Sets the {@link Writer} to be used for writing the incoming elements 
to bucket files.
+        *
+        * @param writer The {@code Writer} to use.
+        */
+       public BucketingSink<T> setWriter(Writer<T> writer) {
+               this.writerTemplate = writer;
+               return this;
+       }
+
+       /**
+        * Sets the suffix of in-progress part files. The default is {@code 
"in-progress"}.
+        */
+       public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) {
+               this.inProgressSuffix = inProgressSuffix;
+               return this;
+       }
+
+       /**
+        * Sets the prefix of in-progress part files. The default is {@code 
"_"}.
+        */
+       public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) {
+               this.inProgressPrefix = inProgressPrefix;
+               return this;
+       }
+
+       /**
+        * Sets the suffix of pending part files. The default is {@code 
".pending"}.
+        */
+       public BucketingSink<T> setPendingSuffix(String pendingSuffix) {
+               this.pendingSuffix = pendingSuffix;
+               return this;
+       }
+
+       /**
+        * Sets the prefix of pending part files. The default is {@code "_"}.
+        */
+       public BucketingSink<T> setPendingPrefix(String pendingPrefix) {
+               this.pendingPrefix = pendingPrefix;
+               return this;
+       }
+
+       /**
+        * Sets the suffix of valid-length files. The default is {@code 
".valid-length"}.
+        */
+       public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) {
+               this.validLengthSuffix = validLengthSuffix;
+               return this;
+       }
+
+       /**
+        * Sets the prefix of valid-length files. The default is {@code "_"}.
+        */
+       public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) {
+               this.validLengthPrefix = validLengthPrefix;
+               return this;
+       }
+
+       /**
+        * Sets the prefix of part files.  The default is {@code "part"}.
+        */
+       public BucketingSink<T> setPartPrefix(String partPrefix) {
+               this.partPrefix = partPrefix;
+               return this;
+       }
+
+       /**
+        * Disable cleanup of leftover in-progress/pending files when the sink 
is opened.
+        *
+        * <p>
+        * This should only be disabled if using the sink without checkpoints, 
to not remove
+        * the files already in the directory.
+        *
+        * @deprecated This option is deprecated and remains only for backwards 
compatibility.
+        * We do not clean up lingering files anymore.
+        */
+       @Deprecated
+       public BucketingSink<T> disableCleanupOnOpen() {
+               return this;
+       }
+
+       /**
+        * Sets the default timeout for asynchronous operations such as 
recoverLease and truncate.
+        *
+        * @param timeout The timeout, in milliseconds.
+        */
+       public BucketingSink<T> setAsyncTimeout(long timeout) {
+               this.asyncTimeout = timeout;
+               return this;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Internal Classes
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * This is used during snapshot/restore to keep track of in-progress 
buckets.
+        * For each bucket, we maintain a state.
+        */
+       static final class State<T> implements Serializable {
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * For every bucket directory (key), we maintain a bucket state 
(value).
+                */
+               final Map<String, BucketState<T>> bucketStates = new 
HashMap<>();
+
+               void addBucketState(Path bucketPath, BucketState<T> state) {
+                       synchronized (bucketStates) {
+                               bucketStates.put(bucketPath.toString(), state);
+                       }
+               }
+
+               BucketState<T> getBucketState(Path bucketPath) {
+                       synchronized (bucketStates) {
+                               return bucketStates.get(bucketPath.toString());
+                       }
+               }
+
+               @Override
+               public String toString() {
+                       return bucketStates.toString();
+               }
+       }
+
+       /**
+        * This is used for keeping track of the current in-progress buckets 
and files that we mark
+        * for moving from pending to final location after we get a 
checkpoint-complete notification.
+        */
+       static final class BucketState<T> implements Serializable {
+               private static final long serialVersionUID = 1L;
+
+               /**
+                * The file that was in-progress when the last checkpoint 
occurred.
+                */
+               String currentFile;
+
+               /**
+                * The valid length of the in-progress file at the time of the 
last checkpoint.
+                */
+               long currentFileValidLength = -1;
+
+               /**
+                * The time this bucket was last written to.
+                */
+               long lastWrittenToTime;
+
+               /**
+                * Pending files that accumulated since the last checkpoint.
+                */
+               List<String> pendingFiles = new ArrayList<>();
+
+               /**
+                * When doing a checkpoint we move the pending files since the 
last checkpoint to this map
+                * with the id of the checkpoint. When we get the 
checkpoint-complete notification we move
+                * pending files of completed checkpoints to their final 
location.
+                */
+               final Map<Long, List<String>> pendingFilesPerCheckpoint = new 
HashMap<>();
+
+               /**
+                * For counting the part files inside a bucket directory. Part 
files follow the pattern
+                * {@code "{part-prefix}-{subtask}-{count}"}. When creating new 
part files we increase the counter.
+                */
+               private transient int partCounter;
+
+               /**
+                * Tracks if the writer is currently opened or closed.
+                */
+               private transient boolean isWriterOpen;
+
+               /**
+                * The actual writer that we user for writing the part files.
+                */
+               private transient Writer<T> writer;
+
+               @Override
+               public String toString() {
+                       return
+                               "In-progress=" + currentFile +
+                                       " validLength=" + 
currentFileValidLength +
+                                       " pendingForNextCheckpoint=" + 
pendingFiles +
+                                       " pendingForPrevCheckpoints=" + 
pendingFilesPerCheckpoint +
+                                       " lastModified@" + lastWrittenToTime;
+               }
+
+               BucketState(long lastWrittenToTime) {
+                       this.lastWrittenToTime = lastWrittenToTime;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
new file mode 100644
index 0000000..b985e14
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs.bucketing;
+
+import org.apache.flink.streaming.connectors.fs.Clock;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ * <p>
+ * The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link BucketingSink}. The {@code dateTimePath}
+ * is determined based on the current system time and the user provided format 
string.
+ *
+ * <p>
+ * {@link SimpleDateFormat} is used to derive a date string from the current 
system time and
+ * the date format string. The default format string is {@code 
"yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+public class DateTimeBucketer<T> implements Bucketer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+       private final String formatString;
+
+       private transient SimpleDateFormat dateFormatter;
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with format string {@code 
"yyyy-MM-dd--HH"}.
+        */
+       public DateTimeBucketer() {
+               this(DEFAULT_FORMAT_STRING);
+       }
+
+       /**
+        * Creates a new {@code DateTimeBucketer} with the given date/time 
format string.
+        *
+        * @param formatString The format string that will be given to {@code 
SimpleDateFormat} to determine
+        *                     the bucket path.
+        */
+       public DateTimeBucketer(String formatString) {
+               this.formatString = formatString;
+
+               this.dateFormatter = new SimpleDateFormat(formatString);
+       }
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               this.dateFormatter = new SimpleDateFormat(formatString);
+       }
+
+       @Override
+       public Path getBucketPath(Clock clock, Path basePath, T element) {
+               String newDateTimeString = dateFormatter.format(new 
Date(clock.currentTimeMillis()));
+               return new Path(basePath + "/" + newDateTimeString);
+       }
+
+       @Override
+       public String toString() {
+               return "DateTimeBucketer{" +
+                               "formatString='" + formatString + '\'' +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
 
b/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
new file mode 100644
index 0000000..fe60d94
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
new file mode 100644
index 0000000..36c0d03
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.fs;
+
+import com.google.common.collect.Sets;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
+ *
+ * <p>
+ * This test only verifies the exactly once behaviour of the sink. Another 
test tests the
+ * rolling behaviour.
+ *
+ * @deprecated should be removed with the {@link RollingSink}.
+ */
+@Deprecated
+public class RollingSinkFaultToleranceITCase extends 
StreamFaultToleranceTestBase {
+
+       final long NUM_STRINGS = 16_000;
+
+       @ClassRule
+       public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private static MiniDFSCluster hdfsCluster;
+       private static org.apache.hadoop.fs.FileSystem dfs;
+
+       private static String outPath;
+
+       private static final String PENDING_SUFFIX = ".pending";
+       private static final String IN_PROGRESS_SUFFIX = ".in-progress";
+
+       @BeforeClass
+       public static void createHDFS() throws IOException {
+               Configuration conf = new Configuration();
+
+               File dataDir = tempFolder.newFolder();
+
+               conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
dataDir.getAbsolutePath());
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(conf);
+               hdfsCluster = builder.build();
+
+               dfs = hdfsCluster.getFileSystem();
+
+               outPath = "hdfs://"
+                               + 
NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), 
hdfsCluster.getNameNodePort())
+                               + "/string-non-rolling-out";
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (hdfsCluster != null) {
+                       hdfsCluster.shutdown();
+               }
+       }
+
+       @Override
+       public void testProgram(StreamExecutionEnvironment env) {
+               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+               int PARALLELISM = 12;
+
+               env.enableCheckpointing(20);
+               env.setParallelism(PARALLELISM);
+               env.disableOperatorChaining();
+
+               DataStream<String> stream = env.addSource(new 
StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
+
+               DataStream<String> mapped = stream
+                               .map(new 
OnceFailingIdentityMapper(NUM_STRINGS));
+
+               RollingSink<String> sink = new RollingSink<String>(outPath)
+                               .setBucketer(new NonRollingBucketer())
+                               .setBatchSize(10000)
+                               .setValidLengthPrefix("")
+                               .setPendingPrefix("")
+                               .setPendingSuffix(PENDING_SUFFIX)
+                               .setInProgressSuffix(IN_PROGRESS_SUFFIX);
+
+               mapped.addSink(sink);
+
+       }
+
+       @Override
+       public void postSubmit() throws Exception {
+               // We read the files and verify that we have read all the 
strings. If a valid-length
+               // file exists we only read the file to that point. (This test 
should work with
+               // FileSystems that support truncate() and with others as well.)
+
+               Pattern messageRegex = Pattern.compile("message (\\d*)");
+
+               // Keep a set of the message IDs that we read. The size must 
equal the read count and
+               // the NUM_STRINGS. If numRead is bigger than the size of the 
set we have seen some
+               // elements twice.
+               Set<Integer> readNumbers = Sets.newHashSet();
+
+               HashSet<String> uniqMessagesRead = new HashSet<>();
+               HashSet<String> messagesInCommittedFiles = new HashSet<>();
+
+               RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new 
Path(
+                               outPath), true);
+
+               while (files.hasNext()) {
+                       LocatedFileStatus file = files.next();
+
+                       if 
(!file.getPath().toString().endsWith(".valid-length")) {
+                               int validLength = (int) file.getLen();
+                               if 
(dfs.exists(file.getPath().suffix(".valid-length"))) {
+                                       FSDataInputStream inStream = 
dfs.open(file.getPath().suffix(".valid-length"));
+                                       String validLengthString = 
inStream.readUTF();
+                                       validLength = 
Integer.parseInt(validLengthString);
+                                       System.out.println("VALID LENGTH: " + 
validLength);
+                               }
+                               FSDataInputStream inStream = 
dfs.open(file.getPath());
+                               byte[] buffer = new byte[validLength];
+                               inStream.readFully(0, buffer, 0, validLength);
+                               inStream.close();
+
+                               ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
+
+                               InputStreamReader inStreamReader = new 
InputStreamReader(bais);
+                               BufferedReader br = new 
BufferedReader(inStreamReader);
+
+                               String line = br.readLine();
+                               while (line != null) {
+                                       Matcher matcher = 
messageRegex.matcher(line);
+                                       if (matcher.matches()) {
+                                               uniqMessagesRead.add(line);
+
+                                               // check that in the committed 
files there are no duplicates
+                                               if 
(!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && 
!file.getPath().toString().endsWith(PENDING_SUFFIX)) {
+                                                       if 
(!messagesInCommittedFiles.add(line)) {
+                                                               
Assert.fail("Duplicate entry in committed bucket.");
+                                                       }
+                                               }
+
+                                               int messageId = 
Integer.parseInt(matcher.group(1));
+                                               readNumbers.add(messageId);
+                                       } else {
+                                               Assert.fail("Read line does not 
match expected pattern.");
+                                       }
+                                       line = br.readLine();
+                               }
+                               br.close();
+                               inStreamReader.close();
+                               bais.close();
+                       }
+               }
+
+               // Verify that we read all strings (at-least-once)
+               Assert.assertEquals(NUM_STRINGS, readNumbers.size());
+
+               // Verify that we don't have duplicates (boom!, exactly-once)
+               Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
+       }
+
+       private static class OnceFailingIdentityMapper extends 
RichMapFunction<String, String> {
+               private static final long serialVersionUID = 1L;
+
+               private static volatile boolean hasFailed = false;
+
+               private final long numElements;
+
+               private long failurePos;
+               private long count;
+
+
+               OnceFailingIdentityMapper(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void open(org.apache.flink.configuration.Configuration 
parameters) throws IOException {
+                       long failurePosMin = (long) (0.7 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+                       long failurePosMax = (long) (0.9 * numElements / 
getRuntimeContext().getNumberOfParallelSubtasks());
+
+                       failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
+                       count = 0;
+               }
+
+               @Override
+               public String map(String value) throws Exception {
+                       count++;
+                       if (!hasFailed && count >= failurePos) {
+                               hasFailed = true;
+                               throw new Exception("Test Failure");
+                       }
+
+                       return value;
+               }
+       }
+
+       private static class StringGeneratingSourceFunction extends 
RichParallelSourceFunction<String>
+                       implements CheckpointedAsynchronously<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final long numElements;
+
+               private int index;
+
+               private volatile boolean isRunning = true;
+
+
+               StringGeneratingSourceFunction(long numElements) {
+                       this.numElements = numElements;
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws Exception {
+                       final Object lockingObject = ctx.getCheckpointLock();
+
+                       final int step = 
getRuntimeContext().getNumberOfParallelSubtasks();
+
+                       if (index == 0) {
+                               index = 
getRuntimeContext().getIndexOfThisSubtask();
+                       }
+
+                       while (isRunning && index < numElements) {
+
+                               Thread.sleep(1);
+                               synchronized (lockingObject) {
+                                       ctx.collect("message " + index);
+                                       index += step;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+
+               private static String randomString(StringBuilder bld, Random 
rnd) {
+                       final int len = rnd.nextInt(10) + 5;
+
+                       for (int i = 0; i < len; i++) {
+                               char next = (char) (rnd.nextInt(20000) + 33);
+                               bld.append(next);
+                       }
+
+                       return bld.toString();
+               }
+
+               @Override
+               public Integer snapshotState(long checkpointId, long 
checkpointTimestamp) {
+                       return index;
+               }
+
+               @Override
+               public void restoreState(Integer state) {
+                       index = state;
+               }
+       }
+}

Reply via email to