gparai closed pull request #1576: DRILL-6894: CTAS and CTTAS are not working on 
S3 storage when cache i…
URL: https://github.com/apache/drill/pull/1576
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 721e80002d1..11dc2042146 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -33,13 +33,11 @@
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -72,20 +70,17 @@ public RecordWriter getRecordWriter(FragmentContext 
context, EasyWriter writer)
     Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", " ");
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) 
writer.getStorageConfig()).getConnection());
-
     options.put("extension", "json");
     options.put("extended", 
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
     options.put("uglify", 
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
     options.put("skipnulls", 
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
     options.put("enableNanInf", 
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
-    RecordWriter recordWriter = new 
JsonRecordWriter(writer.getStorageStrategy());
+
+    RecordWriter recordWriter = new 
JsonRecordWriter(writer.getStorageStrategy(), getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 9e6aaf8d9dd..2e80b3ffb14 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -64,8 +64,11 @@
   // Record write status
   private boolean fRecordStarted = false; // true once the startRecord() is 
called until endRecord() is called
 
-  public JsonRecordWriter(StorageStrategy storageStrategy){
+  private Configuration fsConf;
+
+  public JsonRecordWriter(StorageStrategy storageStrategy, Configuration 
fsConf) {
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : 
storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -78,9 +81,7 @@ public void init(Map<String, String> writerOptions) throws 
IOException {
     this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
     final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify"));
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     Path fileName = new Path(location, prefix + "_" + index + "." + extension);
     try {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bc129ae1d1a..2ac24d8d07c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,6 @@
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
@@ -50,7 +49,6 @@
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -117,17 +115,14 @@ public RecordWriter getRecordWriter(final FragmentContext 
context, final EasyWri
     final Map<String, String> options = new HashMap<>();
 
     options.put("location", writer.getLocation());
-
     FragmentHandle handle = context.getHandle();
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
-
     options.put("separator", getConfig().getFieldDelimiterAsString());
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) 
writer.getStorageConfig()).getConnection());
-
     options.put("extension", getConfig().getExtensions().get(0));
 
-    RecordWriter recordWriter = new 
DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
+    RecordWriter recordWriter = new DrillTextRecordWriter(
+        context.getAllocator(), writer.getStorageStrategy(), 
writer.getFormatPlugin().getFsConf());
     recordWriter.init(options);
 
     return recordWriter;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 2c409963a0f..f46cc1cf299 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,7 +47,6 @@
 import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
@@ -140,8 +139,6 @@ public RecordWriter getRecordWriter(FragmentContext 
context, ParquetWriter write
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), 
handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
 
-    options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig) 
writer.getStorageConfig()).getConnection());
-
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, 
context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
     options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
       
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45233c4da4e..5a64f4070d4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -132,6 +132,7 @@ public ParquetRecordWriter(FragmentContext context, 
ParquetWriter writer) throws
     this.extraMetaData.put(WRITER_VERSION_PROPERTY, 
String.valueOf(ParquetWriter.WRITER_VERSION));
     this.storageStrategy = writer.getStorageStrategy() == null ? 
StorageStrategy.DEFAULT : writer.getStorageStrategy();
     this.cleanUpLocations = Lists.newArrayList();
+    this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
   }
 
   @Override
@@ -139,8 +140,6 @@ public void init(Map<String, String> writerOptions) throws 
IOException {
     this.location = writerOptions.get("location");
     this.prefix = writerOptions.get("prefix");
 
-    conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
     fs = FileSystem.get(conf);
     blockSize = 
Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
     pageSize = 
Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 7b7c47fe4c9..83a00bd1784 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -56,9 +56,12 @@
   private boolean fRecordStarted = false; // true once the startRecord() is 
called until endRecord() is called
   private StringBuilder currentRecord; // contains the current record 
separated by field delimiter
 
-  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy 
storageStrategy) {
+  private Configuration fsConf;
+
+  public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy 
storageStrategy, Configuration fsConf) {
     super(allocator);
     this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT : 
storageStrategy;
+    this.fsConf = new Configuration(fsConf);
   }
 
   @Override
@@ -68,9 +71,7 @@ public void init(Map<String, String> writerOptions) throws 
IOException {
     this.fieldDelimiter = writerOptions.get("separator");
     this.extension = writerOptions.get("extension");
 
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.get(fsConf);
 
     this.currentRecord = new StringBuilder();
     this.index = 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to