Re: [PR] Flink: FlinkFileIO implementation [iceberg]
catkins commented on PR #10151: URL: https://github.com/apache/iceberg/pull/10151#issuecomment-2132549187 This would be a great addition, I got a bit caught out in development this week when setting up an iceberg sink on my local docker flink setup, and being confused why the s3 config from flink config wasn't being picked up by iceberg -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
stevenzwu commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1580188262 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); Review Comment: I meant if underline storage is S3, `S3FileIO` should probably be used (instead of `HadoopFileIO`) -- This is an automated message from the Apache
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1580019899 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); Review Comment: Why? For the record, the implementation for S3 is based on aws packages: https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-base
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
stevenzwu commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1579860068 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); Review Comment: if it is S3 storage, HadoopFileIO shouldn't be used. -- This is an automated message from the Apache Git Service. To respond to the message, please
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576011008 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); Review Comment: Same as HadoopFileIO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL ab
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576012292 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; Review Comment: Let's call it `flinkfilesystem` - I think it is for the best -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1576007535 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { Review Comment: Done, please check the comment ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
stevenzwu commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1575403714 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to listing prefix: %s", prefix), e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete prefix: %s", prefix), e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); Review Comment: it seems that we can't leverage batch/bulk deletion here, which can be significant concern ## flink/v1.18/flink/src/main/java/org/apache/ic
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on PR #10151: URL: https://github.com/apache/iceberg/pull/10151#issuecomment-2066270577 > Quick question: now that flink 1.19 is available in the repo, do we still merge this to 1.18 and then later we port it to 1.19 and all the other versions? I usually try to keep the history, so the reviewers could follow the changes. So until we have so many changes that keeping the history is confusing, I would keep it on 1.18, and port it to 1.19/1.17 after merging this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572152325 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.DelegatingInputStream; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkInputFile implements InputFile, NativelyEncryptedFile { + private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class); + + private final Path path; + private final FileSystem fs; + private FileStatus stat = null; + private Long length = null; + private NativeFileCryptoParameters nativeDecryptionParameters; + + public FlinkInputFile(Path path) { +this.path = path; +try { + this.fs = path.getFileSystem(); +} catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to get file system for path: %s", path), e); +} + } + + public FlinkInputFile(Path path, long length) { +this(path); +this.length = length; + } + + @Override + public long getLength() { +if (length == null) { + this.length = lazyStat().getLen(); +} + +return length; + } + + @Override + public SeekableInputStream newStream() { +try { + return new FlinkSeekableInputStream(path.getFileSystem().open(path)); +} catch (FileNotFoundException e) { + throw new NotFoundException(e, "Failed to open input stream for file: %s", path); +} catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to open input stream for file: %s", path), e); +} + } + + @Override + public String location() { +return path.toString(); + } + + @Override + public boolean exists() { +try { + return lazyStat() != null; +} catch (NotFoundException e) { + return false; +} + } + + private FileStatus lazyStat() { +if (stat == null) { Review Comment: If we have concurrent calls for whatever reason, then we receive the same data. So no biggie, if it is not synchronized. OTOH we save on the synchronization costs if we do not do it - this seems like a better deal ## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkFileIOTest.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.Vector; +import java.util.stream.Collector
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572150806 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); + +if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); +} Review Comment: I wouldn't put an atomic integer, and synchronization there for just debug logging. Also this is called from multiple places, and without context this i
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572151051 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); + +if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); +} + } + + @Override + public void initialize(Map props) { +this.properties = SerializableMap.copyOf(props); + } + + @Override + public Map properties() { +return
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572147961 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
pvary commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1572124567 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571094073 ## flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/FlinkFileIOTest.java: ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.Vector; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class FlinkFileIOTest { + private final Random random = new Random(1); + + private FileSystem fs; + private FlinkFileIO flinkFileIO; + + @TempDir private File tempDir; + + @BeforeEach + public void before() throws Exception { +fs = FileSystem.getLocalFileSystem(); + +flinkFileIO = new FlinkFileIO(); + } + + @Test + public void testListPrefix() { +Path parent = new Path(tempDir.toURI()); + +List scaleSizes = Lists.newArrayList(1, 1000, 2500); + +scaleSizes +.parallelStream() +.forEach( +scale -> { + Path scalePath = new Path(parent, Integer.toString(scale)); + + createRandomFiles(scalePath, scale); + Assertions.assertThat( + Streams.stream(flinkFileIO.listPrefix(scalePath.toUri().toString())).count()) + .isEqualTo((long) scale); +}); + +long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum(); + Assertions.assertThat(Streams.stream(flinkFileIO.listPrefix(parent.toUri().toString())).count()) Review Comment: lets please statically import -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571092822 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkInputFile.java: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.encryption.NativeFileCryptoParameters; +import org.apache.iceberg.encryption.NativelyEncryptedFile; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.io.DelegatingInputStream; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkInputFile implements InputFile, NativelyEncryptedFile { + private static final Logger LOG = LoggerFactory.getLogger(FlinkInputFile.class); + + private final Path path; + private final FileSystem fs; + private FileStatus stat = null; + private Long length = null; + private NativeFileCryptoParameters nativeDecryptionParameters; + + public FlinkInputFile(Path path) { +this.path = path; +try { + this.fs = path.getFileSystem(); +} catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to get file system for path: %s", path), e); +} + } + + public FlinkInputFile(Path path, long length) { +this(path); +this.length = length; + } + + @Override + public long getLength() { +if (length == null) { + this.length = lazyStat().getLen(); +} + +return length; + } + + @Override + public SeekableInputStream newStream() { +try { + return new FlinkSeekableInputStream(path.getFileSystem().open(path)); +} catch (FileNotFoundException e) { + throw new NotFoundException(e, "Failed to open input stream for file: %s", path); +} catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to open input stream for file: %s", path), e); +} + } + + @Override + public String location() { +return path.toString(); + } + + @Override + public boolean exists() { +try { + return lazyStat() != null; +} catch (NotFoundException e) { + return false; +} + } + + private FileStatus lazyStat() { +if (stat == null) { Review Comment: do we need to synchronize here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571089828 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); + +if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); +} + } + + @Override + public void initialize(Map props) { +this.properties = SerializableMap.copyOf(props); + } + + @Override + public Map properties() { +re
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571086551 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { +AtomicInteger failureCount = new AtomicInteger(0); +Tasks.foreach(pathsToDelete) +.executeWith(executorService()) +.retry(DELETE_RETRY_ATTEMPTS) +.stopRetryOn(FileNotFoundException.class) +.suppressFailureWhenFinished() +.onFailure( +(f, e) -> { + LOG.error("Failure during bulk delete on file: {} ", f, e); + failureCount.incrementAndGet(); +}) +.run(this::deleteFile); + +if (failureCount.get() != 0) { + throw new BulkDeletionFailureException(failureCount.get()); +} Review Comment: maybe a `log.debug` statement indicating how many files were deleted from the given prefix -- This is an automated message from the Apache Git S
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571083046 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); +} + } + + @Override + public void deletePrefix(String prefix) { +Path prefixToDelete = new Path(prefix); + +try { + prefixToDelete.getFileSystem().delete(prefixToDelete, true /* recursive */); +} catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: like you did on `deleteFile`, you could enrich`UncheckedIOException` ``` throw new UncheckedIOException(String.format("Failed to delete prefix %s", prefix) e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: FlinkFileIO implementation [iceberg]
rodmeneses commented on code in PR #10151: URL: https://github.com/apache/iceberg/pull/10151#discussion_r1571082650 ## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkFileIO.java: ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsPrefixOperations; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkFileIO implements FileIO, SupportsPrefixOperations, SupportsBulkOperations { + private static final Logger LOG = LoggerFactory.getLogger(FlinkFileIO.class); + private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete-file-parallelism"; + private static final String DELETE_FILE_POOL_NAME = "iceberg-hadoopfileio-delete"; + private static final int DELETE_RETRY_ATTEMPTS = 3; + private static final int DEFAULT_DELETE_CORE_MULTIPLE = 4; + private static volatile ExecutorService executorService; + private SerializableMap properties = SerializableMap.copyOf(ImmutableMap.of()); + + @Override + public InputFile newInputFile(String path) { +return new FlinkInputFile(new Path(path)); + } + + @Override + public InputFile newInputFile(String path, long length) { +return new FlinkInputFile(new Path(path), length); + } + + @Override + public OutputFile newOutputFile(String path) { +return new FlinkOutputFile(new Path(path)); + } + + @Override + public void deleteFile(String path) { +Path toDelete = new Path(path); +try { + toDelete.getFileSystem().delete(toDelete, false /* not recursive */); +} catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete file: %s", path), e); +} + } + + @Override + public Iterable listPrefix(String prefix) { +LOG.debug("Listing {}", prefix); +Path prefixToList = new Path(prefix); +try { + return listPrefix(prefixToList.getFileSystem(), prefixToList); +} catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: like you did on `deleteFile`, you could enrich`UncheckedIOException` ``` throw new UncheckedIOException(String.format("Failed to list prefix %s", prefix) e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org