This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1aae437 [HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841) 1aae437 is described below commit 1aae437257cfd94fd277cf667257f6abffcc0c21 Author: Udit Mehrotra <umehr...@illinois.edu> AuthorDate: Sat Jul 18 16:16:32 2020 -0700 [HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841) Co-authored-by: Mehrotra <udi...@amazon.com> --- .../hudi/common/table/HoodieTableMetaClient.java | 1 + .../apache/hudi/common/util/TablePathUtils.java | 110 ++++++++++++++++++ .../hudi/common/util/TestTablePathUtils.java | 126 +++++++++++++++++++++ .../main/java/org/apache/hudi/DataSourceUtils.java | 23 ++++ .../scala/org/apache/hudi/HudiSparkUtils.scala | 50 ++++++++ .../scala/org/apache/hudi/TestHudiSparkUtils.scala | 105 +++++++++++++++++ 6 files changed, 415 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9675b77..b047595 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -73,6 +73,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String METAFOLDER_NAME = ".hoodie"; public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux"; + public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + File.separator + ".bootstrap"; public static final String MARKER_EXTN = ".marker"; private String basePath; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java new file mode 100644 index 0000000..6982fdb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; + +public class TablePathUtils { + + private static final Logger LOG = LogManager.getLogger(TablePathUtils.class); + + private static boolean hasTableMetadataFolder(FileSystem fs, Path path) { + if (path == null) { + return false; + } + + try { + return fs.exists(new Path(path, HoodieTableMetaClient.METAFOLDER_NAME)); + } catch (IOException ioe) { + throw new HoodieException("Error checking Hoodie metadata folder for " + path, ioe); + } + } + + public static Option<Path> getTablePath(FileSystem fs, Path path) throws HoodieException, IOException { + LOG.info("Getting table path from path : " + path); + + FileStatus fileStatus = fs.getFileStatus(path); + Path directory = fileStatus.isFile() ? fileStatus.getPath().getParent() : fileStatus.getPath(); + + if (TablePathUtils.hasTableMetadataFolder(fs, directory)) { + // Handle table folder itself + return Option.of(directory); + } + + // Handle metadata folder or metadata sub folder path + Option<Path> tablePath = getTablePathFromTableMetadataPath(fs, directory); + if (tablePath.isPresent()) { + return tablePath; + } + + // Handle partition folder + return getTablePathFromPartitionPath(fs, directory); + } + + private static boolean isTableMetadataFolder(String path) { + return path != null && path.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME); + } + + private static boolean isInsideTableMetadataFolder(String path) { + return path != null && path.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"); + } + + private static Option<Path> getTablePathFromTableMetadataPath(FileSystem fs, Path path) { + String pathStr = path.toString(); + + if (isTableMetadataFolder(pathStr)) { + return Option.of(path.getParent()); + } else if (isInsideTableMetadataFolder(pathStr)) { + int index = pathStr.indexOf("/" + HoodieTableMetaClient.METAFOLDER_NAME); + return Option.of(new Path(pathStr.substring(0, index))); + } + + return Option.empty(); + } + + private static Option<Path> getTablePathFromPartitionPath(FileSystem fs, Path partitionPath) { + try { + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) { + HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath); + metadata.readFromFS(); + return Option.of(getNthParent(partitionPath, metadata.getPartitionDepth())); + } + } catch (IOException ioe) { + throw new HoodieException("Error reading partition metadata for " + partitionPath, ioe); + } + + return Option.empty(); + } + + private static Path getNthParent(Path path, int n) { + Path parent = path; + for (int i = 0; i < n; i++) { + parent = parent.getParent(); + } + return parent; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java new file mode 100644 index 0000000..05031f0 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public final class TestTablePathUtils { + + @TempDir + static File tempDir; + private static FileSystem fs; + private static Path tablePath; + private static Path partitionPath1; + private static Path partitionPath2; + private static Path filePath1; + private static Path filePath2; + + @BeforeAll + static void setup() throws IOException { + URI tablePathURI = Paths.get(tempDir.getAbsolutePath(),"test_table").toUri(); + tablePath = new Path(tablePathURI); + fs = tablePath.getFileSystem(new Configuration()); + + // Create bootstrap index folder + assertTrue(new File( + Paths.get(tablePathURI.getPath(), HoodieTableMetaClient.BOOTSTRAP_INDEX_ROOT_FOLDER_PATH).toUri()).mkdirs()); + + // Create partition folders + URI partitionPathURI1 = Paths.get(tablePathURI.getPath(),"key1=abc/key2=def").toUri(); + partitionPath1 = new Path(partitionPathURI1); + URI partitionPathURI2 = Paths.get(tablePathURI.getPath(),"key1=xyz/key2=def").toUri(); + partitionPath2 = new Path(partitionPathURI2); + + assertTrue(new File(partitionPathURI1).mkdirs()); + assertTrue(new File(partitionPathURI2).mkdirs()); + + HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath, + partitionPath1); + partitionMetadata1.trySave(1); + HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath, + partitionPath2); + partitionMetadata2.trySave(2); + + // Create files + URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1.parquet").toUri(); + filePath1 = new Path(filePathURI1); + URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2.parquet").toUri(); + filePath2 = new Path(filePathURI2); + + assertTrue(new File(filePathURI1).createNewFile()); + assertTrue(new File(filePathURI2).createNewFile()); + } + + @Test + void getTablePathFromTablePath() throws IOException { + Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, tablePath); + assertEquals(tablePath, inferredTablePath.get()); + } + + @Test + void getTablePathFromMetadataFolderPath() throws IOException { + Path metadataFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME); + Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, metadataFolder); + assertEquals(tablePath, inferredTablePath.get()); + } + + @Test + void getTablePathFromMetadataSubFolderPath() throws IOException { + Path auxFolder = new Path(tablePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, auxFolder); + assertEquals(tablePath, inferredTablePath.get()); + + Path bootstrapIndexFolder = new Path(tablePath, HoodieTableMetaClient.BOOTSTRAP_INDEX_ROOT_FOLDER_PATH); + inferredTablePath = TablePathUtils.getTablePath(fs, bootstrapIndexFolder); + assertEquals(tablePath, inferredTablePath.get()); + } + + @Test + void getTablePathFromPartitionFolderPath() throws IOException { + Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath1); + assertEquals(tablePath, inferredTablePath.get()); + + inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath2); + assertEquals(tablePath, inferredTablePath.get()); + } + + @Test + void getTablePathFromFilePath() throws IOException { + Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, filePath1); + assertEquals(tablePath, inferredTablePath.get()); + + inferredTablePath = TablePathUtils.getTablePath(fs, filePath2); + assertEquals(tablePath, inferredTablePath.get()); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index fe68af9..36212d0 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -18,6 +18,8 @@ package org.apache.hudi; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; @@ -28,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -45,6 +48,8 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -61,6 +66,8 @@ import java.util.stream.Collectors; */ public class DataSourceUtils { + private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class); + /** * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ @@ -105,6 +112,22 @@ public class DataSourceUtils { } } + public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException { + LOG.info("Getting table path.."); + for (Path path: userProvidedPaths) { + try { + Option<Path> tablePath = TablePathUtils.getTablePath(fs, path); + if (tablePath.isPresent()) { + return tablePath.get().toString(); + } + } catch (HoodieException he) { + LOG.warn("Error trying to get table path from " + path.toString(), he); + } + } + + throw new TableNotFoundException("Unable to find a hudi table for the user provided paths."); + } + /** * This method converts values for fields with certain Avro/Parquet data types that require special handling. * diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala new file mode 100644 index 0000000..861de14 --- /dev/null +++ b/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.common.model.HoodieRecord +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import scala.collection.JavaConverters._ + + +object HudiSparkUtils { + + def getHudiMetadataSchema: StructType = { + StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { + StructField(col, StringType, nullable = true) + })) + } + + def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { + paths.flatMap(path => { + val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + globPaths + }) + } + + def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = { + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache) + } +} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala new file mode 100644 index 0000000..6b1a178 --- /dev/null +++ b/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.io.File +import java.nio.file.Paths + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir + +class TestHudiSparkUtils { + + @Test + def testGlobPaths(@TempDir tempDir: File): Unit = { + val folders: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) + ) + + val files: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) + ) + + folders.foreach(folder => new File(folder.toUri).mkdir()) + files.foreach(file => new File(file.toUri).createNewFile()) + + var paths = Seq(tempDir.getAbsolutePath + "/*") + var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + new Path(paths.head).getFileSystem(new Configuration())) + assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + + paths = Seq(tempDir.getAbsolutePath + "/*/*") + globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + new Path(paths.head).getFileSystem(new Configuration())) + assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + + paths = Seq(tempDir.getAbsolutePath + "/folder1/*") + globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + new Path(paths.head).getFileSystem(new Configuration())) + assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString), + globbedPaths.sortWith(_.toString < _.toString)) + + paths = Seq(tempDir.getAbsolutePath + "/folder2/*") + globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + new Path(paths.head).getFileSystem(new Configuration())) + assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString), + globbedPaths.sortWith(_.toString < _.toString)) + + paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*") + globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths, + new Path(paths.head).getFileSystem(new Configuration())) + assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString)) + } + + @Test + def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = { + val spark = SparkSession.builder + .appName("Hoodie Datasource test") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate + + val folders: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri) + ) + + val files: Seq[Path] = Seq( + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri), + new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri) + ) + + folders.foreach(folder => new File(folder.toUri).mkdir()) + files.foreach(file => new File(file.toUri).createNewFile()) + + val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1))) + val indexedFilePaths = index.allFiles().map(fs => fs.getPath) + assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString)) + } +}