This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 8712ffd8be479b7f71bb65f605f41b5135e74216 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Mon Mar 18 18:19:39 2019 +0200 DRILL-7111: Fix table function execution for directories closes #1700 --- .../exec/store/dfs/WorkspaceSchemaFactory.java | 69 +++++++++++++--------- .../org/apache/drill/TestSelectWithOption.java | 39 ++++++++++-- 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index b6b29e7..2e302f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -140,7 +140,7 @@ public class WorkspaceSchemaFactory { throw new ExecutionSetupException(message); } final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, - ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of()); + ImmutableList.of(Pattern.compile(".*")), ImmutableList.of()); fileMatchers.add(fallbackMatcher); dropFileMatchers = fileMatchers.subList(0, fileMatchers.size() - 1); } else { @@ -163,11 +163,10 @@ public class WorkspaceSchemaFactory { * Checks whether a FileSystem object has the permission to list/read workspace directory * @param fs a DrillFileSystem object that was created with certain user privilege * @return True if the user has access. False otherwise. - * @throws IOException */ public boolean accessible(DrillFileSystem fs) throws IOException { try { - /** + /* * For Windows local file system, fs.access ends up using DeprecatedRawLocalFileStatus which has * TrustedInstaller as owner, and a member of Administrators group could not satisfy the permission. * In this case, we will still use method listStatus. @@ -427,11 +426,10 @@ public class WorkspaceSchemaFactory { // Drill Process User file-system private DrillFileSystem dpsFs; - public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException { + public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) { super(parentSchemaPath, wsName); this.schemaConfig = schemaConfig; this.fs = fs; - //this.fs = ImpersonationUtil.createFileSystem(schemaConfig.getUserName(), fsConf); this.dpsFs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fsConf); } @@ -722,10 +720,6 @@ public class WorkspaceSchemaFactory { return FileSystemConfig.NAME; } - private DrillTable isReadable(FormatMatcher m, FileSelection fileSelection) throws IOException { - return m.isReadable(getFS(), fileSelection, plugin, storageEngineName, schemaConfig); - } - @Override public DrillTable create(TableInstance key) { try { @@ -734,13 +728,20 @@ public class WorkspaceSchemaFactory { return null; } - final boolean hasDirectories = fileSelection.containsDirectories(getFS()); + boolean hasDirectories = fileSelection.containsDirectories(getFS()); + if (key.sig.params.size() > 0) { - FormatPluginConfig fconfig = optionExtractor.createConfigForTable(key); - return new DynamicDrillTable( - plugin, storageEngineName, schemaConfig.getUserName(), - new FormatSelection(fconfig, fileSelection)); + FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories); + + if (newSelection.isEmptyDirectory()) { + return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection); + } + + FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key); + FormatSelection selection = new FormatSelection(formatConfig, newSelection); + return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection); } + if (hasDirectories) { for (final FormatMatcher matcher : dirMatchers) { try { @@ -754,10 +755,8 @@ public class WorkspaceSchemaFactory { } } - final FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection; - if (newSelection == null) { - // empty directory / selection means that this is the empty and schemaless table - fileSelection.setEmptyDirectoryStatus(); + FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories); + if (newSelection.isEmptyDirectory()) { return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection); } @@ -783,8 +782,25 @@ public class WorkspaceSchemaFactory { return null; } + /** + * Expands given file selection if it has directories. + * If expanded file selection is null (i.e. directory is empty), sets empty directory status to true. + * + * @param fileSelection file selection + * @param hasDirectories flag that indicates if given file selection has directories + * @return revisited file selection + */ + private FileSelection detectEmptySelection(FileSelection fileSelection, boolean hasDirectories) throws IOException { + FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection; + if (newSelection == null) { + // empty directory / selection means that this is the empty and schemaless table + fileSelection.setEmptyDirectoryStatus(); + return fileSelection; + } + return newSelection; + } + private FormatMatcher findMatcher(FileStatus file) { - FormatMatcher matcher = null; try { for (FormatMatcher m : dropFileMatchers) { if (m.isFileReadable(getFS(), file)) { @@ -794,7 +810,7 @@ public class WorkspaceSchemaFactory { } catch (IOException e) { logger.debug("Failed to find format matcher for file: %s", file, e); } - return matcher; + return null; } @Override @@ -820,8 +836,7 @@ public class WorkspaceSchemaFactory { } FormatMatcher matcher = null; - Queue<FileStatus> listOfFiles = new LinkedList<>(); - listOfFiles.addAll(fileSelection.getStatuses(getFS())); + Queue<FileStatus> listOfFiles = new LinkedList<>(fileSelection.getStatuses(getFS())); while (!listOfFiles.isEmpty()) { FileStatus currentFile = listOfFiles.poll(); @@ -865,13 +880,13 @@ public class WorkspaceSchemaFactory { StringBuilder tableRenameBuilder = new StringBuilder(); int lastSlashIndex = table.lastIndexOf(Path.SEPARATOR); if (lastSlashIndex != -1) { - tableRenameBuilder.append(table.substring(0, lastSlashIndex + 1)); + tableRenameBuilder.append(table, 0, lastSlashIndex + 1); } // Generate unique identifier which will be added as a suffix to the table name ThreadLocalRandom r = ThreadLocalRandom.current(); long time = (System.currentTimeMillis()/1000); - Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); - Long p2 = r.nextLong(); + long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); + long p2 = r.nextLong(); final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX; String[] pathSplit = table.split(Path.SEPARATOR); /* @@ -884,9 +899,9 @@ public class WorkspaceSchemaFactory { .append(DrillFileSystem.UNDERSCORE_PREFIX) .append(pathSplit[pathSplit.length - 1]) .append(fileNameDelimiter) - .append(p1.toString()) + .append(p1) .append(fileNameDelimiter) - .append(p2.toString()); + .append(p2); String tableRename = tableRenameBuilder.toString(); fs.rename(new Path(defaultLocation, table), new Path(defaultLocation, tableRename)); diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java index a6dff74..ec23194 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java @@ -25,10 +25,11 @@ import static org.junit.Assert.assertThat; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Paths; import org.apache.drill.categories.SqlTest; import org.apache.drill.common.exceptions.UserRemoteException; -import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.TestBuilder; import org.junit.Test; @@ -36,13 +37,12 @@ import org.junit.experimental.categories.Category; @Category(SqlTest.class) public class TestSelectWithOption extends BaseTestQuery { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class); private File genCSVFile(String name, String... rows) throws IOException { File file = new File(format("%s/%s.csv", dirTestWatcher.getRootDir(), name)); try (FileWriter fw = new FileWriter(file)) { - for (int i = 0; i < rows.length; i++) { - fw.append(rows[i] + "\n"); + for (String row : rows) { + fw.append(row).append("\n"); } } return file; @@ -291,4 +291,35 @@ public class TestSelectWithOption extends BaseTestQuery { throw e; } } + + @Test + public void testTableFunctionWithDirectoryExpansion() throws Exception { + String tableName = "dirTable"; + String query = "select 'A' as col from (values(1))"; + test("use dfs.tmp"); + try { + alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv"); + test("create table %s as %s", tableName, query); + + testBuilder() + .sqlQuery("select * from table(%s(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName) + .unOrdered() + .sqlBaselineQuery(query) + .go(); + } finally { + resetSessionOption(ExecConstants.OUTPUT_FORMAT_OPTION); + test("drop table if exists %s", tableName); + } + } + + @Test + public void testTableFunctionWithEmptyDirectory() throws Exception { + String tableName = "emptyTable"; + dirTestWatcher.makeTestTmpSubDir(Paths.get(tableName)); + testBuilder() + .sqlQuery("select * from table(dfs.tmp.`%s`(type=>'text', fieldDelimiter => ',', extractHeader => true))", tableName) + .expectsEmptyResultSet() + .go(); + } + }