Repository: flink
Updated Branches:
  refs/heads/master d636bf78e -> d45cb69af


[FLINK-3790] [streaming] Use proper hadoop config in rolling sink

Closes #1919


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45cb69a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45cb69a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45cb69a

Branch: refs/heads/master
Commit: d45cb69afd38e5dabf64f90e35a21e56a25ddd6f
Parents: d636bf7
Author: Gyula Fora <gyf...@apache.org>
Authored: Wed Apr 20 22:22:55 2016 +0200
Committer: Gyula Fora <gyf...@apache.org>
Committed: Thu Apr 21 11:34:30 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/fs/RollingSink.java      | 18 ++++++++++--------
 .../connectors/fs/SequenceFileWriter.java         |  8 +++++---
 2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 76324d7..799e908 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -279,7 +280,9 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
         * current part file path, the valid length of the in-progress files 
and pending part files.
         */
        private transient BucketState bucketState;
-
+       
+       private transient org.apache.hadoop.conf.Configuration hadoopConf;
+       
        /**
         * Creates a new {@code RollingSink} that writes files to the given 
base directory.
         *
@@ -317,7 +320,8 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                        bucketState = new BucketState();
                }
 
-               FileSystem fs = new Path(basePath).getFileSystem(new 
org.apache.hadoop.conf.Configuration());
+               hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+               FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
                refTruncate = reflectTruncate(fs);
 
                // delete pending/in-progress files that might be left if we 
fail while
@@ -412,9 +416,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
        private void openNewPartFile() throws Exception {
                closeCurrentPartFile();
 
-               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-
-               FileSystem fs = new Path(basePath).getFileSystem(conf);
+               FileSystem fs = new Path(basePath).getFileSystem(hadoopConf);
 
                Path newBucketDirectory = bucketer.getNextBucketPath(new 
Path(basePath));
 
@@ -466,7 +468,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                if (currentPartPath != null) {
                        Path inProgressPath = new 
Path(currentPartPath.getParent(), inProgressPrefix + 
currentPartPath.getName()).suffix(inProgressSuffix);
                        Path pendingPath = new 
Path(currentPartPath.getParent(), pendingPrefix + 
currentPartPath.getName()).suffix(pendingSuffix);
-                       FileSystem fs = inProgressPath.getFileSystem(new 
org.apache.hadoop.conf.Configuration());
+                       FileSystem fs = 
inProgressPath.getFileSystem(hadoopConf);
                        fs.rename(inProgressPath, pendingPath);
                        LOG.debug("Moving in-progress bucket {} to pending file 
{}",
                                        inProgressPath,
@@ -541,7 +543,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                                                Path pendingPath = new 
Path(finalPath.getParent(),
                                                                pendingPrefix + 
finalPath.getName()).suffix(pendingSuffix);
 
-                                               FileSystem fs = 
pendingPath.getFileSystem(new org.apache.hadoop.conf.Configuration());
+                                               FileSystem fs = 
pendingPath.getFileSystem(hadoopConf);
                                                fs.rename(pendingPath, 
finalPath);
                                                LOG.debug(
                                                                "Moving pending 
file {} to final location after complete checkpoint {}.",
@@ -579,7 +581,7 @@ public class RollingSink<T> extends RichSinkFunction<T> 
implements InputTypeConf
                bucketState.pendingFiles.clear();
                FileSystem fs = null;
                try {
-                       fs = new Path(basePath).getFileSystem(new 
org.apache.hadoop.conf.Configuration());
+                       fs = new 
Path(basePath).getFileSystem(HadoopFileSystem.getHadoopConfiguration());
                } catch (IOException e) {
                        LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
                        throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45cb69a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 32b8d49..c71e97f 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -86,9 +87,11 @@ public class SequenceFileWriter<K extends Writable, V 
extends Writable> extends
                }
 
                CompressionCodec codec = null;
+               
+               Configuration conf = HadoopFileSystem.getHadoopConfiguration();
 
                if (!compressionCodecName.equals("None")) {
-                       CompressionCodecFactory codecFactory = new 
CompressionCodecFactory(new Configuration());
+                       CompressionCodecFactory codecFactory = new 
CompressionCodecFactory(conf);
                        codec = 
codecFactory.getCodecByName(compressionCodecName);
                        if (codec == null) {
                                throw new RuntimeException("Codec " + 
compressionCodecName + " not found.");
@@ -96,7 +99,7 @@ public class SequenceFileWriter<K extends Writable, V extends 
Writable> extends
                }
 
                // the non-deprecated constructor syntax is only available in 
recent hadoop versions...
-               writer = SequenceFile.createWriter(new Configuration(),
+               writer = SequenceFile.createWriter(conf,
                                getStream(),
                                keyClass,
                                valueClass,
@@ -119,7 +122,6 @@ public class SequenceFileWriter<K extends Writable, V 
extends Writable> extends
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
                if (!type.isTupleType()) {
                        throw new IllegalArgumentException("Input 
TypeInformation is not a tuple type.");

Reply via email to