Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-05-26 Thread via GitHub


catkins commented on PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#issuecomment-2132549187

   This would be a great addition, I got a bit caught out in development this 
week when setting up an iceberg sink on my local docker flink setup, and being 
confused why the s3 config from flink  config wasn't being picked up by iceberg


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-25 Thread via GitHub


stevenzwu commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1580188262


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to listing prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);

Review Comment:
   I meant if underline storage is S3, `S3FileIO` should probably be used 
(instead of `HadoopFileIO`)



-- 
This is an automated message from the Apache

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-25 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1580019899


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to listing prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);

Review Comment:
   Why?
   For the record, the implementation for S3 is based on aws packages: 
https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-base

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-25 Thread via GitHub


stevenzwu commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1579860068


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to listing prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);

Review Comment:
   if it is S3 storage, HadoopFileIO shouldn't be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-23 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576011008


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to listing prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);

Review Comment:
   Same as HadoopFileIO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL ab

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-23 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576012292


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";

Review Comment:
   Let's call it `flinkfilesystem` - I think it is for the best



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-23 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576007535


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {

Review Comment:
   Done, please check the comment



##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-22 Thread via GitHub


stevenzwu commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1575403714


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to listing prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete prefix: 
%s", prefix), e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);

Review Comment:
   it seems that we can't leverage batch/bulk deletion here, which can be 
significant concern



##
flink/v1.18/flink/src/main/java/org/apache/ic

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#issuecomment-2066270577

   > Quick question: now that flink 1.19 is available in the repo, do we still 
merge this to 1.18 and then later we port it to 1.19 and all the other versions?
   
   I usually try to keep the history, so the reviewers could follow the changes.
   
   So until we have so many changes that keeping the history is confusing, I 
would keep it on 1.18, and port it to 1.19/1.17 after merging this PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572152325


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.DelegatingInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkInputFile.class);
+
+  private final Path path;
+  private final FileSystem fs;
+  private FileStatus stat = null;
+  private Long length = null;
+  private NativeFileCryptoParameters nativeDecryptionParameters;
+
+  public FlinkInputFile(Path path) {
+this.path = path;
+try {
+  this.fs = path.getFileSystem();
+} catch (IOException e) {
+  throw new UncheckedIOException(
+  String.format("Failed to get file system for path: %s", path), e);
+}
+  }
+
+  public FlinkInputFile(Path path, long length) {
+this(path);
+this.length = length;
+  }
+
+  @Override
+  public long getLength() {
+if (length == null) {
+  this.length = lazyStat().getLen();
+}
+
+return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {
+try {
+  return new FlinkSeekableInputStream(path.getFileSystem().open(path));
+} catch (FileNotFoundException e) {
+  throw new NotFoundException(e, "Failed to open input stream for file: 
%s", path);
+} catch (IOException e) {
+  throw new UncheckedIOException(
+  String.format("Failed to open input stream for file: %s", path), e);
+}
+  }
+
+  @Override
+  public String location() {
+return path.toString();
+  }
+
+  @Override
+  public boolean exists() {
+try {
+  return lazyStat() != null;
+} catch (NotFoundException e) {
+  return false;
+}
+  }
+
+  private FileStatus lazyStat() {
+if (stat == null) {

Review Comment:
   If we have concurrent calls for whatever reason, then we receive the same 
data. So no biggie, if it is not synchronized. OTOH we save on the 
synchronization costs if we do not do it - this seems like a better deal



##
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkFileIOTest.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.stream.Collector

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572150806


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);
+
+if (failureCount.get() != 0) {
+  throw new BulkDeletionFailureException(failureCount.get());
+}

Review Comment:
   I wouldn't put an atomic integer, and synchronization there for just debug 
logging. Also this is called from multiple places, and without context this 
i

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572151051


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);
+
+if (failureCount.get() != 0) {
+  throw new BulkDeletionFailureException(failureCount.get());
+}
+  }
+
+  @Override
+  public void initialize(Map props) {
+this.properties = SerializableMap.copyOf(props);
+  }
+
+  @Override
+  public Map properties() {
+return 

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572147961


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-19 Thread via GitHub


pvary commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572124567


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571094073


##
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkFileIOTest.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class FlinkFileIOTest {
+  private final Random random = new Random(1);
+
+  private FileSystem fs;
+  private FlinkFileIO flinkFileIO;
+
+  @TempDir private File tempDir;
+
+  @BeforeEach
+  public void before() throws Exception {
+fs = FileSystem.getLocalFileSystem();
+
+flinkFileIO = new FlinkFileIO();
+  }
+
+  @Test
+  public void testListPrefix() {
+Path parent = new Path(tempDir.toURI());
+
+List scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+scaleSizes
+.parallelStream()
+.forEach(
+scale -> {
+  Path scalePath = new Path(parent, Integer.toString(scale));
+
+  createRandomFiles(scalePath, scale);
+  Assertions.assertThat(
+  
Streams.stream(flinkFileIO.listPrefix(scalePath.toUri().toString())).count())
+  .isEqualTo((long) scale);
+});
+
+long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+
Assertions.assertThat(Streams.stream(flinkFileIO.listPrefix(parent.toUri().toString())).count())

Review Comment:
   lets please statically import 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571092822


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.encryption.NativeFileCryptoParameters;
+import org.apache.iceberg.encryption.NativelyEncryptedFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.DelegatingInputStream;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkInputFile implements InputFile, NativelyEncryptedFile {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkInputFile.class);
+
+  private final Path path;
+  private final FileSystem fs;
+  private FileStatus stat = null;
+  private Long length = null;
+  private NativeFileCryptoParameters nativeDecryptionParameters;
+
+  public FlinkInputFile(Path path) {
+this.path = path;
+try {
+  this.fs = path.getFileSystem();
+} catch (IOException e) {
+  throw new UncheckedIOException(
+  String.format("Failed to get file system for path: %s", path), e);
+}
+  }
+
+  public FlinkInputFile(Path path, long length) {
+this(path);
+this.length = length;
+  }
+
+  @Override
+  public long getLength() {
+if (length == null) {
+  this.length = lazyStat().getLen();
+}
+
+return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {
+try {
+  return new FlinkSeekableInputStream(path.getFileSystem().open(path));
+} catch (FileNotFoundException e) {
+  throw new NotFoundException(e, "Failed to open input stream for file: 
%s", path);
+} catch (IOException e) {
+  throw new UncheckedIOException(
+  String.format("Failed to open input stream for file: %s", path), e);
+}
+  }
+
+  @Override
+  public String location() {
+return path.toString();
+  }
+
+  @Override
+  public boolean exists() {
+try {
+  return lazyStat() != null;
+} catch (NotFoundException e) {
+  return false;
+}
+  }
+
+  private FileStatus lazyStat() {
+if (stat == null) {

Review Comment:
   do we need to synchronize here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571089828


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);
+
+if (failureCount.get() != 0) {
+  throw new BulkDeletionFailureException(failureCount.get());
+}
+  }
+
+  @Override
+  public void initialize(Map props) {
+this.properties = SerializableMap.copyOf(props);
+  }
+
+  @Override
+  public Map properties() {
+re

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571086551


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deleteFiles(Iterable pathsToDelete) throws 
BulkDeletionFailureException {
+AtomicInteger failureCount = new AtomicInteger(0);
+Tasks.foreach(pathsToDelete)
+.executeWith(executorService())
+.retry(DELETE_RETRY_ATTEMPTS)
+.stopRetryOn(FileNotFoundException.class)
+.suppressFailureWhenFinished()
+.onFailure(
+(f, e) -> {
+  LOG.error("Failure during bulk delete on file: {} ", f, e);
+  failureCount.incrementAndGet();
+})
+.run(this::deleteFile);
+
+if (failureCount.get() != 0) {
+  throw new BulkDeletionFailureException(failureCount.get());
+}

Review Comment:
   maybe a `log.debug` statement indicating how many files were deleted from 
the given prefix



-- 
This is an automated message from the Apache Git S

Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571083046


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);
+}
+  }
+
+  @Override
+  public void deletePrefix(String prefix) {
+Path prefixToDelete = new Path(prefix);
+
+try {
+  prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive 
*/);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);

Review Comment:
   like you did on `deleteFile`, you could enrich`UncheckedIOException` 
   ```
throw new UncheckedIOException(String.format("Failed to delete prefix %s", 
prefix) e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: FlinkFileIO implementation [iceberg]

2024-04-18 Thread via GitHub


rodmeneses commented on code in PR #10151:
URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571082650


##
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java:
##
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlinkFileIO implements FileIO, SupportsPrefixOperations, 
SupportsBulkOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = 
"iceberg.hadoop.delete-file-parallelism";
+  private static final String DELETE_FILE_POOL_NAME = 
"iceberg-hadoopfileio-delete";
+  private static final int DELETE_RETRY_ATTEMPTS = 3;
+  private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4;
+  private static volatile ExecutorService executorService;
+  private SerializableMap properties = 
SerializableMap.copyOf(ImmutableMap.of());
+
+  @Override
+  public InputFile newInputFile(String path) {
+return new FlinkInputFile(new Path(path));
+  }
+
+  @Override
+  public InputFile newInputFile(String path, long length) {
+return new FlinkInputFile(new Path(path), length);
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+return new FlinkOutputFile(new Path(path));
+  }
+
+  @Override
+  public void deleteFile(String path) {
+Path toDelete = new Path(path);
+try {
+  toDelete.getFileSystem().delete(toDelete, false /* not recursive */);
+} catch (IOException e) {
+  throw new UncheckedIOException(String.format("Failed to delete file: 
%s", path), e);
+}
+  }
+
+  @Override
+  public Iterable listPrefix(String prefix) {
+LOG.debug("Listing {}", prefix);
+Path prefixToList = new Path(prefix);
+try {
+  return listPrefix(prefixToList.getFileSystem(), prefixToList);
+} catch (IOException e) {
+  throw new UncheckedIOException(e);

Review Comment:
   like you did on `deleteFile`, you could enrich`UncheckedIOException` 
   ```
throw new UncheckedIOException(String.format("Failed to list prefix %s", 
prefix) e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org