dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169153191
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
return String.format("%s(%s) %s %s", transform, sourceName, direction,
nullOrder);
}
}
+
+ /**
+ * Implement our own index in-memory index which will only list directories
to avoid unnecessary
+ * file listings. Should ONLY be used to get partition directory paths. Uses
table's schema to
+ * only visit partition dirs using number of partition columns depth
recursively. Does NOT return
+ * files within leaf dir.
+ */
+ private static class InMemoryLeafDirOnlyIndex extends
PartitioningAwareFileIndex {
+
+ private final Path rootPath;
+ private final FileStatusCache fileStatusCache;
+ private final SparkSession sparkSession;
+ private final StructType userSpecifiedSchema;
+ private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+ private scala.collection.immutable.Map<Path, FileStatus[]>
cachedLeafDirToChildFiles;
+ private org.apache.spark.sql.execution.datasources.PartitionSpec
cachedPartitionSpec;
+
+ public InMemoryLeafDirOnlyIndex(
+ SparkSession sparkSession,
+ scala.collection.immutable.Map<String, String> parameters,
+ StructType userSpecifiedSchema,
+ FileStatusCache fileStatusCache,
+ Path rootPath) {
+ super(sparkSession, parameters, Option.apply(userSpecifiedSchema),
fileStatusCache);
+ this.fileStatusCache = fileStatusCache;
+ this.rootPath = rootPath;
+ this.sparkSession = sparkSession;
+ this.userSpecifiedSchema = userSpecifiedSchema;
+ }
+
+ @Override
+ public scala.collection.Seq<Path> rootPaths() {
+ return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+ }
+
+ @Override
+ public void refresh() {
+ fileStatusCache.invalidateAll();
+ cachedLeafFiles = null;
+ cachedLeafDirToChildFiles = null;
+ cachedPartitionSpec = null;
+ }
+
+ @Override
+ public org.apache.spark.sql.execution.datasources.PartitionSpec
partitionSpec() {
+ if (cachedPartitionSpec == null) {
+ cachedPartitionSpec = inferPartitioning();
+ }
+ log.trace("Partition spec: {}", cachedPartitionSpec);
+ return cachedPartitionSpec;
+ }
+
+ @Override
+ public LinkedHashMap<Path, FileStatus> leafFiles() {
+ if (cachedLeafFiles == null) {
+ try {
+ List<FileStatus> fileStatuses =
+ listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+ LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+ for (FileStatus fs : fileStatuses) {
+ map.put(fs.getPath(), fs);
+ }
+ cachedLeafFiles = map;
+ } catch (IOException e) {
+ throw new RuntimeException("error listing files for path=" +
rootPath, e);
+ }
+ }
+ return cachedLeafFiles;
+ }
+
+ static List<FileStatus> listLeafDirs(
+ SparkSession spark, Path path, StructType partitionSpec, int level)
throws IOException {
+ List<FileStatus> leafDirs = new ArrayList<>();
+ int numPartitionCols = partitionSpec.fields().length;
+ if (level < numPartitionCols) {
+ try (FileSystem fs =
path.getFileSystem(spark.sparkContext().hadoopConfiguration())) {
+ List<FileStatus> dirs =
+ Stream.of(fs.listStatus(path))
+ .filter(FileStatus::isDirectory)
+ .collect(Collectors.toList());
+ for (FileStatus dir : dirs) {
+ // stop recursive call once we reach the expected end of
partitions as per table schema
+ if (level == numPartitionCols - 1) {
+ leafDirs.add(dir);
+ } else {
+ leafDirs.addAll(listLeafDirs(spark, dir.getPath(),
partitionSpec, level + 1));
+ }
+ }
+ }
+ }
+ return leafDirs;
+ }
+
+ @Override
+ public scala.collection.immutable.Map<Path, FileStatus[]>
leafDirToChildrenFiles() {
+ if (cachedLeafDirToChildFiles == null) {
+ List<Tuple2<Path, FileStatus[]>> tuple2s =
+ JavaConverters.seqAsJavaList(leafFiles().values().toSeq()).stream()
+ .map(
+ fileStatus -> {
+ // Create an empty data file in the leaf dir.
+ // As this index is only used to list partition
directories,
+ // we can stop listing the leaf dir to avoid unnecessary
listing which can
+ // take a while on folders with 1000s of files
+ return new Tuple2<>(
+ fileStatus.getPath(),
+ new FileStatus[]
{createEmptyChildDataFileStatus(fileStatus)});
+ })
+ .collect(Collectors.toList());
+ cachedLeafDirToChildFiles =
+ (scala.collection.immutable.Map<Path, FileStatus[]>)
+ Map$.MODULE$.apply(JavaConverters.asScalaBuffer(tuple2s));
+ }
+ return cachedLeafDirToChildFiles;
+ }
+
+ private FileStatus createEmptyChildDataFileStatus(FileStatus fileStatus) {
+ return new FileStatus(
+ 1L,
+ false,
+ fileStatus.getReplication(),
+ 1L,
+ fileStatus.getModificationTime(),
+ fileStatus.getAccessTime(),
+ fileStatus.getPermission(),
+ fileStatus.getOwner(),
+ fileStatus.getGroup(),
+ new Path(fileStatus.getPath(), fileStatus.getPath().toString() +
"/dummyDataFile"));
+ }
Review Comment:
maybe add a comment beside boolean parameter to help with reading?
https://github.com/apache/iceberg/blob/master/CONTRIBUTING.md#boolean-arguments
```java
return new FileStatus(
1L,
false, /* is directory */
fileStatus.getReplication(),
1L,
fileStatus.getModificationTime(),
fileStatus.getAccessTime(),
fileStatus.getPermission(),
fileStatus.getOwner(),
fileStatus.getGroup(),
new Path(fileStatus.getPath(), fileStatus.getPath().toString() +
"/dummyDataFile"));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]