gparai closed pull request #1576: DRILL-6894: CTAS and CTTAS are not working on
S3 storage when cache i…
URL: https://github.com/apache/drill/pull/1576
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 721e80002d1..11dc2042146 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -33,13 +33,11 @@
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -72,20 +70,17 @@ public RecordWriter getRecordWriter(FragmentContext
context, EasyWriter writer)
Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
-
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(),
handle.getMinorFragmentId());
options.put("prefix", fragmentId);
-
options.put("separator", " ");
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)
writer.getStorageConfig()).getConnection());
-
options.put("extension", "json");
options.put("extended",
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_EXTENDED_TYPES)));
options.put("uglify",
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_UGLIFY)));
options.put("skipnulls",
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_SKIPNULLFIELDS)));
options.put("enableNanInf",
Boolean.toString(context.getOptions().getOption(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR)));
- RecordWriter recordWriter = new
JsonRecordWriter(writer.getStorageStrategy());
+
+ RecordWriter recordWriter = new
JsonRecordWriter(writer.getStorageStrategy(), getFsConf());
recordWriter.init(options);
return recordWriter;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index 9e6aaf8d9dd..2e80b3ffb14 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -64,8 +64,11 @@
// Record write status
private boolean fRecordStarted = false; // true once the startRecord() is
called until endRecord() is called
- public JsonRecordWriter(StorageStrategy storageStrategy){
+ private Configuration fsConf;
+
+ public JsonRecordWriter(StorageStrategy storageStrategy, Configuration
fsConf) {
this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT :
storageStrategy;
+ this.fsConf = new Configuration(fsConf);
}
@Override
@@ -78,9 +81,7 @@ public void init(Map<String, String> writerOptions) throws
IOException {
this.skipNullFields = Boolean.parseBoolean(writerOptions.get("skipnulls"));
final boolean uglify = Boolean.parseBoolean(writerOptions.get("uglify"));
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY,
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
- this.fs = FileSystem.get(conf);
+ this.fs = FileSystem.get(fsConf);
Path fileName = new Path(location, prefix + "_" + index + "." + extension);
try {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index bc129ae1d1a..2ac24d8d07c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -39,7 +39,6 @@
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
@@ -50,7 +49,6 @@
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
@@ -117,17 +115,14 @@ public RecordWriter getRecordWriter(final FragmentContext
context, final EasyWri
final Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
-
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(),
handle.getMinorFragmentId());
options.put("prefix", fragmentId);
-
options.put("separator", getConfig().getFieldDelimiterAsString());
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)
writer.getStorageConfig()).getConnection());
-
options.put("extension", getConfig().getExtensions().get(0));
- RecordWriter recordWriter = new
DrillTextRecordWriter(context.getAllocator(), writer.getStorageStrategy());
+ RecordWriter recordWriter = new DrillTextRecordWriter(
+ context.getAllocator(), writer.getStorageStrategy(),
writer.getFormatPlugin().getFsConf());
recordWriter.init(options);
return recordWriter;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 2c409963a0f..f46cc1cf299 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -47,7 +47,6 @@
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.util.DrillFileSystemUtil;
@@ -140,8 +139,6 @@ public RecordWriter getRecordWriter(FragmentContext
context, ParquetWriter write
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(),
handle.getMinorFragmentId());
options.put("prefix", fragmentId);
- options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)
writer.getStorageConfig()).getConnection());
-
options.put(ExecConstants.PARQUET_BLOCK_SIZE,
context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45233c4da4e..5a64f4070d4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -132,6 +132,7 @@ public ParquetRecordWriter(FragmentContext context,
ParquetWriter writer) throws
this.extraMetaData.put(WRITER_VERSION_PROPERTY,
String.valueOf(ParquetWriter.WRITER_VERSION));
this.storageStrategy = writer.getStorageStrategy() == null ?
StorageStrategy.DEFAULT : writer.getStorageStrategy();
this.cleanUpLocations = Lists.newArrayList();
+ this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
}
@Override
@@ -139,8 +140,6 @@ public void init(Map<String, String> writerOptions) throws
IOException {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
- conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY,
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
fs = FileSystem.get(conf);
blockSize =
Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
pageSize =
Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 7b7c47fe4c9..83a00bd1784 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -56,9 +56,12 @@
private boolean fRecordStarted = false; // true once the startRecord() is
called until endRecord() is called
private StringBuilder currentRecord; // contains the current record
separated by field delimiter
- public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy
storageStrategy) {
+ private Configuration fsConf;
+
+ public DrillTextRecordWriter(BufferAllocator allocator, StorageStrategy
storageStrategy, Configuration fsConf) {
super(allocator);
this.storageStrategy = storageStrategy == null ? StorageStrategy.DEFAULT :
storageStrategy;
+ this.fsConf = new Configuration(fsConf);
}
@Override
@@ -68,9 +71,7 @@ public void init(Map<String, String> writerOptions) throws
IOException {
this.fieldDelimiter = writerOptions.get("separator");
this.extension = writerOptions.get("extension");
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY,
writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
- this.fs = FileSystem.get(conf);
+ this.fs = FileSystem.get(fsConf);
this.currentRecord = new StringBuilder();
this.index = 0;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services