This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6e152aa HIVE-25569: Enable table definition over a single file(SFS) (#2680) (Zoltan Haindrich reviewed by Krisztian Kasa) 6e152aa is described below commit 6e152aa28bc5116bf9210f9deb0f95d2d73183f7 Author: Zoltan Haindrich <k...@rxd.hu> AuthorDate: Tue Nov 2 11:17:27 2021 +0100 HIVE-25569: Enable table definition over a single file(SFS) (#2680) (Zoltan Haindrich reviewed by Krisztian Kasa) --- .../apache/hadoop/hive/ql/io/SingleFileSystem.java | 346 +++++++++++++++++++++ .../services/org.apache.hadoop.fs.FileSystem | 9 + .../hadoop/hive/ql/io/TestSingleFileSystem.java | 159 ++++++++++ ql/src/test/queries/clientpositive/sfs.q | 17 + ql/src/test/results/clientpositive/llap/sfs.q.out | 103 ++++++ 5 files changed, 634 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SingleFileSystem.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SingleFileSystem.java new file mode 100644 index 0000000..e0e9bff --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SingleFileSystem.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +/** + * Implements an abstraction layer to show files in a single directory. + * + * Suppose the filesystem has a directory in which there are multiple files: + * file://somedir/f1.txt + * file://somedir/f2.txt + * + * In case of Hive the content of a directory may be inside a table. + * To give a way to show a single file as a single file in a directory it could be specified: + * + * sfs+file://somedir/f1.txt/#SINGLEFILE# + * + * This will be a directory containing only the f1.txt and nothing else. + * + */ +/* + * Thru out this file there are paths of both the overlay filesystem and the underlying fs. + * To avoid confusion between these path types - all paths which are in the overlay fs are refered + * with the upper keyword - and paths on the underlying fs are identified with the lower keyword. + * + * For example: + * 'sfs+file:///foo/bar/#SINGLEFILE#/bar' is an upper path + * 'file:///foo/bar' is a lower path + */ +public abstract class SingleFileSystem extends FileSystem { + + public static class HDFS extends SingleFileSystem { + } + + public static class S3A extends SingleFileSystem { + } + + public static class ABFS extends SingleFileSystem { + } + + public static class ABFSS extends SingleFileSystem { + } + + public static class ADL extends SingleFileSystem { + } + + public static class GS extends SingleFileSystem { + } + + public static class O3FS extends SingleFileSystem { + } + + public static class PFILE extends SingleFileSystem { + } + + public static class FILE extends SingleFileSystem { + } + + private static final String SINGLEFILE = "#SINGLEFILE#"; + + private URI uri; + private Configuration conf; + private Path workDir; + + public String getScheme() { + return "sfs+" + getClass().getSimpleName().toLowerCase(); + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + this.uri = uri; + this.conf = conf; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public FSDataInputStream open(Path upperPath, int bufferSize) throws IOException { + SfsInfo info = new SfsInfo(upperPath); + switch (info.type) { + case LEAF_FILE: + return info.lowerTargetPath.getFileSystem(conf).open(info.lowerTargetPath, bufferSize); + default: + throw unsupported("open:" + upperPath); + } + } + + @Override + public FileStatus getFileStatus(Path upperPath) throws IOException { + SfsInfo info = new SfsInfo(upperPath); + switch (info.type) { + case LEAF_FILE: + return makeFileStatus(info.upperTargetPath, info.lowerTargetPath); + case DIR_MODE: + return makeDirFileStatus(upperPath, removeSfsScheme(upperPath)); + case SINGLEFILE_DIR: + return makeDirFileStatus(upperPath, info.lowerTargetPath); + default: + throw unsupported("fileStatus:" + upperPath); + } + } + + @Override + public FileStatus[] listStatus(Path upperPath) throws FileNotFoundException, IOException { + SfsInfo info = new SfsInfo(upperPath); + switch (info.type) { + case DIR_MODE: + return dirModeListStatus(upperPath); + case LEAF_FILE: + case SINGLEFILE_DIR: + return new FileStatus[] { makeFileStatus(info.upperTargetPath, info.lowerTargetPath) }; + default: + throw unsupported("listStatus: " + upperPath); + } + } + + @Override + public void setWorkingDirectory(Path new_dir) { + workDir = new_dir; + } + + @Override + public Path getWorkingDirectory() { + return workDir; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) throws IOException { + throw unsupported("create: " + f); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + throw unsupported("append: " + f); + + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + throw unsupported("rename: " + src + " to " + dst); + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + throw unsupported("delete: " + f); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + throw unsupported("mkdirs: " + f); + } + + @Override + public String getCanonicalServiceName() { + return null; + } + + /** + * Represents what kind of path we are at. + * + * For every state I will give the path for the following path: + * + * sfs+file:///foo/bar/#SINGLEFILE#/bar + */ + enum SfsInodeType { + /** + * Represents the final leaf file. + * + * sfs+file:///foo/bar/#SINGLEFILE#/bar + */ + LEAF_FILE, + /** + * We are at a SINGLEFILE directory node. + * + * sfs+file:///foo/bar/#SINGLEFILE# + */ + SINGLEFILE_DIR, + /** + * We are on the covered filesystem in directory mode. + * + * In this mode all files and directories of the underlying fs is shown as directories. + * + * sfs+file:///foo/bar + * sfs+file:///foo/ + * + */ + DIR_MODE, + /** + * We are at a path which doesnt exists. + * + * sfs+file:///foo/bar/#SINGLEFILE#/invalid + */ + NONEXISTENT, + } + + /** + * Identifies and collects basic infos about the current path. + * + * TargetPath is also identified for both lower/upper if its available. + */ + class SfsInfo { + + final private URI uri; + final private SfsInodeType type; + final private Path lowerTargetPath; + final private Path upperTargetPath; + + public SfsInfo(Path upperPath) { + uri = upperPath.toUri(); + String[] parts = uri.getPath().split(Path.SEPARATOR); + + int n = parts.length; + if (n >= 1 && parts[n - 1].equals(SINGLEFILE)) { + type = SfsInodeType.SINGLEFILE_DIR; + lowerTargetPath = removeSfsScheme(upperPath.getParent()); + upperTargetPath = new Path(uri.getScheme(), uri.getAuthority(), uri.getPath() + "/" + parts[n - 2]); + } else { + if (n >= 2 && parts[n - 2].equals(SINGLEFILE)) { + if (n >= 3 && !parts[n - 3].equals(parts[n - 1])) { + type = SfsInodeType.NONEXISTENT; + lowerTargetPath = null; + upperTargetPath = null; + } else { + type = SfsInodeType.LEAF_FILE; + lowerTargetPath = removeSfsScheme(upperPath.getParent().getParent()); + upperTargetPath = upperPath; + } + } else { + type = SfsInodeType.DIR_MODE; + lowerTargetPath = null; + upperTargetPath = null; + } + } + } + } + + /** + * Implements listing for {@link SfsInodeType#DIR_MODE}. + */ + public FileStatus[] dirModeListStatus(Path upperPath) throws IOException { + Path lowerPath = removeSfsScheme(upperPath); + FileSystem fs = lowerPath.getFileSystem(conf); + FileStatus status = fs.getFileStatus(lowerPath); + List<FileStatus> ret = new ArrayList<>(); + if (status.isDirectory()) { + FileStatus[] statusList = fs.listStatus(lowerPath); + for (FileStatus fileStatus : statusList) { + ret.add(makeDirFileStatus(fileStatus)); + } + } else { + FileStatus dirStat = makeDirFileStatus(new Path(upperPath, SINGLEFILE), lowerPath); + ret.add(dirStat); + } + return ret.toArray(new FileStatus[0]); + } + + public FileStatus makeFileStatus(Path upperPath, Path lowerPath) throws IOException { + FileStatus status = lowerPath.getFileSystem(conf).getFileStatus(lowerPath); + status = new FileStatus(status); + status.setPath(upperPath); + return status; + } + + private static FileStatus makeDirFileStatus(FileStatus lowerStatus) throws IOException { + return makeDirFileStatus(makeSfsPath(lowerStatus.getPath()), lowerStatus); + } + + private Path removeSfsScheme(Path lowerTargetPath0) { + URI u = lowerTargetPath0.toUri(); + return new Path(removeSfsScheme(u.getScheme()), u.getAuthority(), u.getPath()); + } + + private String removeSfsScheme(String scheme) { + if (scheme.startsWith("sfs+")) { + return scheme.substring(4); + } + if (scheme.equals("sfs")) { + return null; + } + throw new RuntimeException("Unexpected scheme: " + scheme); + } + + private static Path makeSfsPath(Path path) throws IOException { + URI oldUri = path.toUri(); + if (oldUri.getScheme().startsWith("sfs+")) { + throw new IOException("unexpected path"); + } + return new Path("sfs+" + oldUri.getScheme(), oldUri.getAuthority(), oldUri.getPath()); + } + + public FileStatus makeDirFileStatus(Path upperPath, Path lowerPath) throws IOException { + FileStatus status = lowerPath.getFileSystem(conf).getFileStatus(lowerPath); + return makeDirFileStatus(upperPath, status); + } + + public static FileStatus makeDirFileStatus(Path upperPath, FileStatus status) throws IOException { + FileStatus newStatus = new FileStatus(status.getLen(), true, status.getReplication(), status.getBlockSize(), + status.getModificationTime(), status.getAccessTime(), addExecute(status.getPermission()), status.getOwner(), + status.getGroup(), (status.isSymlink() ? status.getSymlink() : null), status.getPath()); + newStatus.setPath(upperPath); + return newStatus; + } + + private static FsPermission addExecute(FsPermission permission) { + return new FsPermission(permission.toShort() | 1 | (1 << 3) | (1 << 6)); + } + + private IOException unsupported(String str) { + return new IOException("Unsupported SFS filesystem operation! (" + str + ")"); + } +} diff --git a/ql/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/ql/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem index 09353f3..d7ca79a 100644 --- a/ql/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem +++ b/ql/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -14,3 +14,12 @@ org.apache.hadoop.hive.ql.io.NullScanFileSystem org.apache.hadoop.fs.LocalFileSystem org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem +org.apache.hadoop.hive.ql.io.SingleFileSystem$ABFS +org.apache.hadoop.hive.ql.io.SingleFileSystem$ABFSS +org.apache.hadoop.hive.ql.io.SingleFileSystem$ADL +org.apache.hadoop.hive.ql.io.SingleFileSystem$HDFS +org.apache.hadoop.hive.ql.io.SingleFileSystem$S3A +org.apache.hadoop.hive.ql.io.SingleFileSystem$FILE +org.apache.hadoop.hive.ql.io.SingleFileSystem$PFILE +org.apache.hadoop.hive.ql.io.SingleFileSystem$GS +org.apache.hadoop.hive.ql.io.SingleFileSystem$O3FS diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestSingleFileSystem.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSingleFileSystem.java new file mode 100644 index 0000000..c7de37f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestSingleFileSystem.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Scanner; +import java.util.ServiceLoader; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +public class TestSingleFileSystem { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private File f1; + private String f1path; + private FileSystem fs; + + @Test + public void testAllRegistered() { + ServiceLoader<FileSystem> fs = ServiceLoader.load(FileSystem.class); + Set<Class<?>> knownFileSystems = new HashSet<>(); + + for (FileSystem fileSystem : fs) { + knownFileSystems.add(fileSystem.getClass()); + } + + for (Class<?> sfsClass : SingleFileSystem.class.getDeclaredClasses()) { + if (SingleFileSystem.class.isAssignableFrom(sfsClass)) { + if (!knownFileSystems.contains(sfsClass)) { + fail(sfsClass + " is not registered!"); + } + } + } + } + + @Before + public void before() throws Exception { + f1 = folder.newFile("f1"); + Files.write("asd", f1, Charsets.ISO_8859_1); + f1path = f1.toURI().toString(); + Path p = new Path("sfs+" + f1path); + fs = p.getFileSystem(new Configuration()); + } + + @Test + public void testGetFileStatus() throws Exception { + assertSfsDir(fs.getFileStatus(new Path("sfs+" + folder.getRoot().toURI()))); + assertSfsDir(fs.getFileStatus(new Path("sfs+" + f1path))); + assertSfsDir(fs.getFileStatus(new Path("sfs+" + f1path + "/#SINGLEFILE#"))); + assertSfsFile(fs.getFileStatus(new Path("sfs+" + f1path + "/#SINGLEFILE#/f1"))); + } + + @Test + public void testListStatusSingleFileDir() throws Exception { + String targetSfsPath = "sfs+" + f1path + "/#SINGLEFILE#"; + FileStatus[] list = fs.listStatus(new Path(targetSfsPath)); + assertEquals(1, list.length); + assertEquals(targetSfsPath + "/f1", list[0].getPath().toString()); + } + + @Test + public void testListStatusSingleFileDirEndingInSlash() throws Exception { + String targetSfsPath = "sfs+" + f1path + "/#SINGLEFILE#/"; + FileStatus[] list = fs.listStatus(new Path(targetSfsPath)); + assertEquals(1, list.length); + assertEquals(targetSfsPath + "f1", list[0].getPath().toString()); + } + + @Test + public void testListStatusFile() throws Exception { + String targetSfsPath = "sfs+" + f1path; + FileStatus[] list = fs.listStatus(new Path(targetSfsPath)); + assertEquals(1, list.length); + assertEquals(targetSfsPath + "/#SINGLEFILE#", list[0].getPath().toString()); + } + + @Test + public void testListStatusRoot() throws Exception { + folder.newFolder("folder"); + String targetSfsPath = "sfs+" + folder.getRoot().toURI().toString(); + FileStatus[] list = fs.listStatus(new Path(targetSfsPath)); + assertEquals(2, list.length); + + Set<String> expectedPaths = new HashSet<String>(); + expectedPaths.add(targetSfsPath + "f1"); + expectedPaths.add(targetSfsPath + "folder"); + Set<String> paths = new HashSet<String>(); + for (FileStatus fileStatus : list) { + paths.add(fileStatus.getPath().toString()); + } + assertEquals(expectedPaths, paths); + } + + @Test + public void testOpenTargetFile() throws Exception { + try (FSDataInputStream ret = fs.open(new Path("sfs+" + f1path + "/#SINGLEFILE#/f1"))) { + try (Scanner sc = new Scanner(ret)) { + String line = sc.nextLine(); + assertEquals("asd", line); + } + } + } + + @Test(expected = IOException.class) + public void testOpenSinglefileDir() throws Exception { + fs.open(new Path("sfs+" + f1path + "/#SINGLEFILE#/")); + } + + @Test(expected = IOException.class) + public void testOpenRealTargetFile() throws Exception { + fs.open(new Path("sfs+" + f1path)); + } + + private void assertSfsDir(FileStatus fileStatus) { + assertTrue(fileStatus.isDirectory()); + assertTrue(fileStatus.getPath().toUri().getScheme().startsWith("sfs+")); + } + + private void assertSfsFile(FileStatus fileStatus) { + assertTrue(!fileStatus.isDirectory()); + assertTrue(fileStatus.getPath().toUri().getScheme().startsWith("sfs+")); + } + +} diff --git a/ql/src/test/queries/clientpositive/sfs.q b/ql/src/test/queries/clientpositive/sfs.q new file mode 100644 index 0000000..c58ebe1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/sfs.q @@ -0,0 +1,17 @@ + +dfs ${system:test.dfs.mkdir} -p file:///${system:test.tmp.dir}/sfs; +dfs -cp ${system:hive.root}/data/files/table1_delim.txt file:///${system:test.tmp.dir}/sfs/f1.txt; +dfs -cp ${system:hive.root}/data/files/table1_delim.txt file:///${system:test.tmp.dir}/sfs/f2.txt; + +create external table t1 (a string,b string,c string) location 'file://${system:test.tmp.dir}/sfs'; + +select * from t1; + +create external table t1s (a string,b string,c string) location 'sfs+file://${system:test.tmp.dir}/sfs/f1.txt/#SINGLEFILE#'; + +select * from t1s; + +desc formatted t1s; + +select count(1) from t1; +select count(1) from t1s; diff --git a/ql/src/test/results/clientpositive/llap/sfs.q.out b/ql/src/test/results/clientpositive/llap/sfs.q.out new file mode 100644 index 0000000..102a6ef --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/sfs.q.out @@ -0,0 +1,103 @@ +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@t1 +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1 +PREHOOK: query: select * from t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select * from t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +1 Acura 4 NULL NULL +2 Toyota 3 NULL NULL +3 Tesla 5 NULL NULL +4 Honda 5 NULL NULL +11 Mazda 2 NULL NULL +1 Acura 4 NULL NULL +2 Toyota 3 NULL NULL +3 Tesla 5 NULL NULL +4 Honda 5 NULL NULL +11 Mazda 2 NULL NULL +#### A masked pattern was here #### +PREHOOK: type: CREATETABLE +#### A masked pattern was here #### +PREHOOK: Output: database:default +PREHOOK: Output: default@t1s +#### A masked pattern was here #### +POSTHOOK: type: CREATETABLE +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t1s +PREHOOK: query: select * from t1s +PREHOOK: type: QUERY +PREHOOK: Input: default@t1s +#### A masked pattern was here #### +POSTHOOK: query: select * from t1s +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1s +#### A masked pattern was here #### +1 Acura 4 NULL NULL +2 Toyota 3 NULL NULL +3 Tesla 5 NULL NULL +4 Honda 5 NULL NULL +11 Mazda 2 NULL NULL +PREHOOK: query: desc formatted t1s +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@t1s +POSTHOOK: query: desc formatted t1s +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@t1s +# col_name data_type comment +a string +b string +c string + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: EXTERNAL_TABLE +Table Parameters: + EXTERNAL TRUE + bucketing_version 2 + numFiles 1 + totalSize 52 +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 +PREHOOK: query: select count(1) from t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +10 +PREHOOK: query: select count(1) from t1s +PREHOOK: type: QUERY +PREHOOK: Input: default@t1s +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from t1s +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t1s +#### A masked pattern was here #### +5