dramaticlly commented on code in PR #7363:
URL: https://github.com/apache/iceberg/pull/7363#discussion_r1169159619
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
return String.format("%s(%s) %s %s", transform, sourceName, direction,
nullOrder);
}
}
+
+ /**
+ * Implement our own index in-memory index which will only list directories
to avoid unnecessary
+ * file listings. Should ONLY be used to get partition directory paths. Uses
table's schema to
+ * only visit partition dirs using number of partition columns depth
recursively. Does NOT return
+ * files within leaf dir.
+ */
+ private static class InMemoryLeafDirOnlyIndex extends
PartitioningAwareFileIndex {
+
+ private final Path rootPath;
+ private final FileStatusCache fileStatusCache;
+ private final SparkSession sparkSession;
+ private final StructType userSpecifiedSchema;
+ private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+ private scala.collection.immutable.Map<Path, FileStatus[]>
cachedLeafDirToChildFiles;
+ private org.apache.spark.sql.execution.datasources.PartitionSpec
cachedPartitionSpec;
+
+ public InMemoryLeafDirOnlyIndex(
Review Comment:
I think this `public` can be omitted
```
Error: eckstyle] [ERROR]
/home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:1047:5:
Redundant 'public' modifier. [RedundantModifier]
```
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
return String.format("%s(%s) %s %s", transform, sourceName, direction,
nullOrder);
}
}
+
+ /**
+ * Implement our own index in-memory index which will only list directories
to avoid unnecessary
+ * file listings. Should ONLY be used to get partition directory paths. Uses
table's schema to
+ * only visit partition dirs using number of partition columns depth
recursively. Does NOT return
+ * files within leaf dir.
+ */
+ private static class InMemoryLeafDirOnlyIndex extends
PartitioningAwareFileIndex {
+
+ private final Path rootPath;
+ private final FileStatusCache fileStatusCache;
+ private final SparkSession sparkSession;
+ private final StructType userSpecifiedSchema;
+ private LinkedHashMap<Path, FileStatus> cachedLeafFiles;
+ private scala.collection.immutable.Map<Path, FileStatus[]>
cachedLeafDirToChildFiles;
+ private org.apache.spark.sql.execution.datasources.PartitionSpec
cachedPartitionSpec;
+
+ public InMemoryLeafDirOnlyIndex(
+ SparkSession sparkSession,
+ scala.collection.immutable.Map<String, String> parameters,
+ StructType userSpecifiedSchema,
+ FileStatusCache fileStatusCache,
+ Path rootPath) {
+ super(sparkSession, parameters, Option.apply(userSpecifiedSchema),
fileStatusCache);
+ this.fileStatusCache = fileStatusCache;
+ this.rootPath = rootPath;
+ this.sparkSession = sparkSession;
+ this.userSpecifiedSchema = userSpecifiedSchema;
+ }
+
+ @Override
+ public scala.collection.Seq<Path> rootPaths() {
+ return JavaConverters.asScalaBuffer(Arrays.asList(rootPath)).seq();
+ }
+
+ @Override
+ public void refresh() {
+ fileStatusCache.invalidateAll();
+ cachedLeafFiles = null;
+ cachedLeafDirToChildFiles = null;
+ cachedPartitionSpec = null;
+ }
+
+ @Override
+ public org.apache.spark.sql.execution.datasources.PartitionSpec
partitionSpec() {
+ if (cachedPartitionSpec == null) {
+ cachedPartitionSpec = inferPartitioning();
+ }
+ log.trace("Partition spec: {}", cachedPartitionSpec);
+ return cachedPartitionSpec;
+ }
+
+ @Override
+ public LinkedHashMap<Path, FileStatus> leafFiles() {
+ if (cachedLeafFiles == null) {
+ try {
+ List<FileStatus> fileStatuses =
+ listLeafDirs(sparkSession, rootPath, userSpecifiedSchema, 0);
+ LinkedHashMap<Path, FileStatus> map = new LinkedHashMap<>();
+ for (FileStatus fs : fileStatuses) {
+ map.put(fs.getPath(), fs);
+ }
+ cachedLeafFiles = map;
+ } catch (IOException e) {
+ throw new RuntimeException("error listing files for path=" +
rootPath, e);
+ }
+ }
+ return cachedLeafFiles;
+ }
+
+ static List<FileStatus> listLeafDirs(
+ SparkSession spark, Path path, StructType partitionSpec, int level)
throws IOException {
+ List<FileStatus> leafDirs = new ArrayList<>();
Review Comment:
checkstyle seem to prefer `Lists.newArrayList()`
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -996,4 +1027,134 @@ public String unknown(
return String.format("%s(%s) %s %s", transform, sourceName, direction,
nullOrder);
}
}
+
+ /**
+ * Implement our own index in-memory index which will only list directories
to avoid unnecessary
+ * file listings. Should ONLY be used to get partition directory paths. Uses
table's schema to
+ * only visit partition dirs using number of partition columns depth
recursively. Does NOT return
+ * files within leaf dir.
+ */
+ private static class InMemoryLeafDirOnlyIndex extends
PartitioningAwareFileIndex {
Review Comment:
yeah I guess I am just curious about the customization over
`leafDirToChildrenFiles`, but have to admit there's no easy way to test this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]