This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ac8733d7b Core: Support FileIO prefix operations (#5096)
ac8733d7b is described below

commit ac8733d7b9cf182bcee62d67cec4e1342efb6387
Author: Daniel Weeks <[email protected]>
AuthorDate: Wed Jun 22 13:40:33 2022 -0700

    Core: Support FileIO prefix operations (#5096)
---
 .../main/java/org/apache/iceberg/io/FileInfo.java  |  44 +++++++++
 .../iceberg/io/SupportsPrefixOperations.java       |  51 ++++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  33 ++++++-
 .../main/java/org/apache/iceberg/aws/s3/S3URI.java |  11 +++
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    |  42 +++++++++
 .../org/apache/iceberg/hadoop/HadoopFileIO.java    |  68 +++++++++++++-
 .../apache/iceberg/hadoop/HadoopFileIOTest.java    | 104 +++++++++++++++++++++
 7 files changed, 351 insertions(+), 2 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/FileInfo.java 
b/api/src/main/java/org/apache/iceberg/io/FileInfo.java
new file mode 100644
index 000000000..63a72c283
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/io/FileInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+public class FileInfo {
+  private final String location;
+  private final long size;
+  private final long createdAtMillis;
+
+  public FileInfo(String location, long size, long createdAtMillis) {
+    this.location = location;
+    this.size = size;
+    this.createdAtMillis = createdAtMillis;
+  }
+
+  public String location() {
+    return location;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  public long createdAtMillis() {
+    return createdAtMillis;
+  }
+}
diff --git 
a/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java 
b/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java
new file mode 100644
index 000000000..fc65f38ab
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+/**
+ * This interface is intended as an extension for FileIO implementations
+ * to provide additional prefix based operations that may be useful in
+ * performing supporting operations.
+ */
+public interface SupportsPrefixOperations {
+
+  /**
+   * Return an iterable of all files under a prefix.
+   * <p>
+   * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+   * like the prefix must fully match a directory whereas key/value object
+   * stores may allow for arbitrary prefixes.
+   *
+   * @param prefix prefix to list
+   * @return iterable of file information
+   */
+  Iterable<FileInfo> listPrefix(String prefix);
+
+  /**
+   * Delete all files under a prefix.
+   * <p>
+   * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+   * like the prefix must fully match a directory whereas key/value object
+   * stores may allow for arbitrary prefixes.
+   *
+   * @param prefix prefix to delete
+   */
+  void deletePrefix(String prefix);
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 3f669a3b6..3cd557c37 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -33,15 +33,18 @@ import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.CredentialSupplier;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.metrics.MetricsContext;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
@@ -54,6 +57,7 @@ import 
software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
 import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
 import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -67,7 +71,7 @@ import software.amazon.awssdk.services.s3.model.Tagging;
  * URIs with schemes s3a, s3n, https are also treated as s3 file paths.
  * Using this FileIO with other schemes will result in {@link 
org.apache.iceberg.exceptions.ValidationException}.
  */
-public class S3FileIO implements FileIO, SupportsBulkOperations, 
CredentialSupplier {
+public class S3FileIO implements FileIO, SupportsBulkOperations, 
SupportsPrefixOperations, CredentialSupplier {
   private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
   private static final String DEFAULT_METRICS_IMPL = 
"org.apache.iceberg.hadoop.HadoopMetricsContext";
   private static volatile ExecutorService executorService;
@@ -241,6 +245,33 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, CredentialSuppl
     return Lists.newArrayList();
   }
 
+  @Override
+  public Iterable<FileInfo> listPrefix(String prefix) {
+    S3URI s3uri = new S3URI(prefix, 
awsProperties.s3BucketToAccessPointMapping());
+    ListObjectsV2Request request = 
ListObjectsV2Request.builder().bucket(s3uri.bucket()).prefix(s3uri.key()).build();
+
+    return () -> client().listObjectsV2Paginator(request).stream()
+        .flatMap(r -> r.contents().stream())
+        .map(o -> new FileInfo(
+            String.format("%s://%s/%s", s3uri.scheme(), s3uri.bucket(), 
o.key()),
+            o.size(), o.lastModified().toEpochMilli())).iterator();
+  }
+
+  /**
+   * This method provides a "best-effort" to delete all objects under the
+   * given prefix.
+   *
+   * Bulk delete operations are used and no reattempt is made for deletes if
+   * they fail, but will log any individual objects that are not deleted as 
part
+   * of the bulk operation.
+   *
+   * @param prefix prefix to delete
+   */
+  @Override
+  public void deletePrefix(String prefix) {
+    deleteFiles(() -> 
Streams.stream(listPrefix(prefix)).map(FileInfo::location).iterator());
+  }
+
   private S3Client client() {
     if (client == null) {
       synchronized (this) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
index 3398cbac9..6a798b183 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
@@ -42,6 +42,7 @@ class S3URI {
   private static final String FRAGMENT_DELIM = "#";
 
   private final String location;
+  private final String scheme;
   private final String bucket;
   private final String key;
 
@@ -73,6 +74,7 @@ class S3URI {
     this.location = location;
     String [] schemeSplit = location.split(SCHEME_DELIM, -1);
     ValidationException.check(schemeSplit.length == 2, "Invalid S3 URI, cannot 
determine scheme: %s", location);
+    this.scheme = schemeSplit[0];
 
     String [] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
     ValidationException.check(authoritySplit.length == 2, "Invalid S3 URI, 
cannot determine bucket: %s", location);
@@ -108,6 +110,15 @@ class S3URI {
     return location;
   }
 
+  /**
+   * Returns the original scheme provided in the location.
+   *
+   * @return uri scheme
+   */
+  public String scheme() {
+    return scheme;
+  }
+
   @Override
   public String toString() {
     return location;
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index b9d7496c3..a34300aa6 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -35,13 +35,16 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
+import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
@@ -190,4 +193,43 @@ public class TestS3FileIO {
 
     assertEquals("s3", post.get().serviceName());
   }
+
+  @Test
+  public void testPrefixList() {
+    String prefix = "s3://bucket/path/to/list";
+
+    List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+    scaleSizes.parallelStream().forEach(scale -> {
+      String scalePrefix = String.format("%s/%s/", prefix, scale);
+
+      createRandomObjects(scalePrefix, scale);
+      assertEquals((long) scale, 
Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
+    });
+
+    long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+    Assertions.assertEquals(totalFiles, 
Streams.stream(s3FileIO.listPrefix(prefix)).count());
+  }
+
+  @Test
+  public void testPrefixDelete() {
+    String prefix = "s3://bucket/path/to/delete";
+    List<Integer> scaleSizes = Lists.newArrayList(0, 5, 1000, 2500);
+
+    scaleSizes.parallelStream().forEach(scale -> {
+      String scalePrefix = String.format("%s/%s/", prefix, scale);
+
+      createRandomObjects(scalePrefix, scale);
+      s3FileIO.deletePrefix(scalePrefix);
+      assertEquals(0L, 
Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
+    });
+  }
+
+  private void createRandomObjects(String prefix, int count) {
+    S3URI s3URI = new S3URI(prefix);
+
+    random.ints(count).parallel().forEach(i ->
+        s3mock.putObject(builder -> 
builder.bucket(s3URI.bucket()).key(s3URI.key() + i).build(), 
RequestBody.empty())
+    );
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index 1c53240db..f3c1db18e 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -20,17 +20,23 @@
 package org.apache.iceberg.hadoop;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableSupplier;
 
-public class HadoopFileIO implements FileIO, HadoopConfigurable {
+public class HadoopFileIO implements FileIO, HadoopConfigurable, 
SupportsPrefixOperations {
 
   private SerializableSupplier<Configuration> hadoopConf;
 
@@ -89,4 +95,64 @@ public class HadoopFileIO implements FileIO, 
HadoopConfigurable {
   public void serializeConfWith(Function<Configuration, 
SerializableSupplier<Configuration>> confSerializer) {
     this.hadoopConf = confSerializer.apply(getConf());
   }
+
+  @Override
+  public Iterable<FileInfo> listPrefix(String prefix) {
+    Path prefixToList = new Path(prefix);
+    FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());
+
+    return () -> {
+      try {
+        return Streams.stream(new 
AdaptingIterator<>(fs.listFiles(prefixToList, true /* recursive */)))
+          .map(fileStatus -> new FileInfo(fileStatus.getPath().toString(), 
fileStatus.getLen(),
+              fileStatus.getModificationTime())).iterator();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    };
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+    Path prefixToDelete = new Path(prefix);
+    FileSystem fs = Util.getFs(prefixToDelete, hadoopConf.get());
+
+    try {
+      fs.delete(prefixToDelete, true /* recursive */);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * This class is a simple adaptor to allow for using Hadoop's
+   * RemoteIterator as an Iterator.
+   *
+   * @param <E> element type
+   */
+  private static class AdaptingIterator<E> implements Iterator<E>, 
RemoteIterator<E> {
+    private final RemoteIterator<E> delegate;
+
+    AdaptingIterator(RemoteIterator<E> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return delegate.hasNext();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    @Override
+    public E next() {
+      try {
+        return delegate.next();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java 
b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
new file mode 100644
index 000000000..0721d6999
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class HadoopFileIOTest {
+  private final Random random = new Random(1);
+
+  private FileSystem fs;
+  private HadoopFileIO hadoopFileIO;
+
+  @TempDir
+  static File tempDir;
+
+  @BeforeEach
+  public void before() throws Exception {
+    Configuration conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+
+    hadoopFileIO = new HadoopFileIO(conf);
+  }
+
+  @Test
+  public void testListPrefix() {
+    Path parent = new Path(tempDir.toURI());
+
+    List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+    scaleSizes.parallelStream().forEach(scale -> {
+      Path scalePath = new Path(parent, Integer.toString(scale));
+
+      createRandomFiles(scalePath, scale);
+      assertEquals((long) scale, 
Streams.stream(hadoopFileIO.listPrefix(scalePath.toUri().toString())).count());
+    });
+
+    long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+    assertEquals(totalFiles, 
Streams.stream(hadoopFileIO.listPrefix(parent.toUri().toString())).count());
+  }
+
+  @Test
+  public void testDeletePrefix() {
+    Path parent = new Path(tempDir.toURI());
+
+    List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+    scaleSizes.parallelStream().forEach(scale -> {
+      Path scalePath = new Path(parent, Integer.toString(scale));
+
+      createRandomFiles(scalePath, scale);
+      hadoopFileIO.deletePrefix(scalePath.toUri().toString());
+
+      // Hadoop filesystem will throw if the path does not exist
+      assertThrows(UncheckedIOException.class, () -> 
hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator());
+    });
+
+    hadoopFileIO.deletePrefix(parent.toUri().toString());
+    // Hadoop filesystem will throw if the path does not exist
+    assertThrows(UncheckedIOException.class, () -> 
hadoopFileIO.listPrefix(parent.toUri().toString()).iterator());
+  }
+
+  private void createRandomFiles(Path parent, int count) {
+    random.ints(count).parallel().forEach(i -> {
+          try {
+            fs.createNewFile(new Path(parent, "file-" + i));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        }
+    );
+  }
+}

Reply via email to