Repository: flink Updated Branches: refs/heads/release-1.0 81c69f9cc -> f1d34b17b
[FLINK-3790] [streaming] Use proper hadoop config in rolling sink Closes #1919 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1d34b17 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1d34b17 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1d34b17 Branch: refs/heads/release-1.0 Commit: f1d34b17b6171a6df6b18a0e417d39d02a4506b5 Parents: 81c69f9 Author: Gyula Fora <gyf...@apache.org> Authored: Wed Apr 20 22:22:55 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Thu Apr 28 21:54:41 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/fs/RollingSink.java | 18 ++++++++++-------- .../connectors/fs/SequenceFileWriter.java | 7 +++++-- 2 files changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f1d34b17/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index f186f53..9f235e8 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -285,7 +286,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf * current part file path, the valid length of the in-progress files and pending part files. */ private transient BucketState bucketState; - + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + /** * Creates a new {@code RollingSink} that writes files to the given base directory. * @@ -323,7 +326,8 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf bucketState = new BucketState(); } - FileSystem fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration()); + hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); refTruncate = reflectTruncate(fs); // delete pending/in-progress files that might be left if we fail while @@ -418,9 +422,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf private void openNewPartFile() throws Exception { closeCurrentPartFile(); - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - - FileSystem fs = new Path(basePath).getFileSystem(conf); + FileSystem fs = new Path(basePath).getFileSystem(hadoopConf); Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); @@ -484,7 +486,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf if (currentPartPath != null) { Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); - FileSystem fs = inProgressPath.getFileSystem(new org.apache.hadoop.conf.Configuration()); + FileSystem fs = inProgressPath.getFileSystem(hadoopConf); fs.rename(inProgressPath, pendingPath); LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, @@ -615,7 +617,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); - FileSystem fs = pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration()); + FileSystem fs = pendingPath.getFileSystem(hadoopConf); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location after complete checkpoint {}.", @@ -657,7 +659,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf bucketState.pendingFiles.clear(); FileSystem fs = null; try { - fs = new Path(basePath).getFileSystem(new org.apache.hadoop.conf.Configuration()); + fs = new Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration()); } catch (IOException e) { LOG.error("Error while creating FileSystem in checkpoint restore.", e); throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/f1d34b17/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 928d96e..3c8a58e 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.SequenceFile; @@ -91,9 +92,11 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen this.outputStream = outStream; CompressionCodec codec = null; + + Configuration conf = HadoopFileSystem.getHadoopConfiguration(); if (!compressionCodecName.equals("None")) { - CompressionCodecFactory codecFactory = new CompressionCodecFactory(new Configuration()); + CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); codec = codecFactory.getCodecByName(compressionCodecName); if (codec == null) { throw new RuntimeException("Codec " + compressionCodecName + " not found."); @@ -101,7 +104,7 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen } // the non-deprecated constructor syntax is only available in recent hadoop versions... - writer = SequenceFile.createWriter(new Configuration(), + writer = SequenceFile.createWriter(conf, outStream, keyClass, valueClass,