mxm commented on code in PR #13302:
URL: https://github.com/apache/iceberg/pull/13302#discussion_r2182243075


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/OrphanFilesDetector.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.FileURI;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A specialized co-process function that performs an anti-join between two 
streams of file URIs.
+ *
+ * <p>Emits every file that exists in the file system but is not referenced in 
the table metadata,
+ * which are considered orphan files. It also handles URI normalization using 
provided scheme and
+ * authority equivalence mappings.
+ */
+@Internal
+public class OrphanFilesDetector extends KeyedCoProcessFunction<String, 
String, String, String> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OrphanFilesDetector.class);
+
+  // Use MapState to dedupe the strings found in the table
+  private transient MapState<String, Boolean> foundInTable;
+  private transient ValueState<String> foundInFileSystem;
+  private transient ValueState<Boolean> hasUriError;
+  private final DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode;
+  private final Map<String, String> equalSchemes;
+  private final Map<String, String> equalAuthorities;
+
+  public OrphanFilesDetector(
+      DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode,
+      Map<String, String> equalSchemes,
+      Map<String, String> equalAuthorities) {
+    this.prefixMismatchMode = prefixMismatchMode;
+    this.equalSchemes = equalSchemes;
+    this.equalAuthorities = equalAuthorities;
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+    foundInTable =
+        getRuntimeContext()
+            .getMapState(
+                new MapStateDescriptor<>("antiJoinFoundInTable", Types.STRING, 
Types.BOOLEAN));
+    hasUriError =
+        getRuntimeContext().getState(new 
ValueStateDescriptor<>("antiJoinUriError", Types.BOOLEAN));
+    foundInFileSystem =
+        getRuntimeContext()
+            .getState(new ValueStateDescriptor<>("antiJoinFoundInFileSystem", 
Types.STRING));
+  }
+
+  @Override
+  public void processElement1(String value, Context context, Collector<String> 
collector)
+      throws Exception {
+    if (shouldSkipElement(value, context)) {
+      return;
+    }
+
+    if (!foundInTable.contains(value)) {
+      foundInTable.put(value, true);
+      context.timerService().registerEventTimeTimer(context.timestamp());
+    }
+  }
+
+  @Override
+  public void processElement2(String value, Context context, Collector<String> 
collector)
+      throws Exception {
+    if (shouldSkipElement(value, context)) {
+      return;
+    }
+
+    foundInFileSystem.update(value);
+    context.timerService().registerEventTimeTimer(context.timestamp());
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> 
out) throws Exception {
+    if (Boolean.TRUE.equals(hasUriError.value())) {
+      clearState();
+      return;
+    }
+
+    List<FileURI> foundInTablesList = Lists.newArrayList();
+    foundInTable
+        .keys()
+        .forEach(uri -> foundInTablesList.add(new FileURI(uri, equalSchemes, 
equalAuthorities)));
+
+    if (foundInFileSystem.value() != null && foundInTablesList.isEmpty()) {
+      FileURI fileURI = new FileURI(foundInFileSystem.value(), equalSchemes, 
equalAuthorities);
+      out.collect(fileURI.getUriAsString());
+    } else if (foundInFileSystem.value() != null && 
!foundInTablesList.isEmpty()) {
+      FileURI actual = new FileURI(foundInFileSystem.value(), equalSchemes, 
equalAuthorities);
+      if (hasMismatch(actual, foundInTablesList)) {
+        if (prefixMismatchMode == DeleteOrphanFiles.PrefixMismatchMode.DELETE) 
{
+          out.collect(foundInFileSystem.value());
+        } else if (prefixMismatchMode == 
DeleteOrphanFiles.PrefixMismatchMode.ERROR) {
+          ValidationException validationException =
+              new ValidationException(
+                  "Unable to determine whether certain files are orphan. "
+                      + "Metadata references files that match listed/provided 
files except for authority/scheme. "
+                      + "Please, inspect the conflicting authorities/schemes 
and provide which of them are equal "
+                      + "by further configuring the action via equalSchemes() 
and equalAuthorities() methods. "
+                      + "Set the prefix mismatch mode to 'NONE' to ignore 
remaining locations with conflicting "
+                      + "authorities/schemes or to 'DELETE' if you are 
ABSOLUTELY confident that remaining conflicting "
+                      + "authorities/schemes are different. It will be 
impossible to recover deleted files. "
+                      + "Conflicting authorities/schemes");
+          LOG.warn(
+              "Unable to determine whether certain files are orphan. Found in 
filesystem: {} and in table: {}",
+              actual,
+              StringUtils.join(foundInTablesList, ","),
+              validationException);
+          ctx.output(
+              
org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles.ERROR_STREAM,
+              validationException);
+        }
+      }
+    }

Review Comment:
   Can we change the structure to the following, to make it more readable:
   
   ```java
     if (foundInFileSystem.value() != null) {
       if (foundInTablesList.isEmpty()) {
         ..
       } else {
         ..
       }
     }
   ```



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.FileURI;
+import org.apache.iceberg.flink.maintenance.operator.AntiJoin;
+import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor;
+import org.apache.iceberg.flink.maintenance.operator.FileUriConverter;
+import org.apache.iceberg.flink.maintenance.operator.ListFileSystemFiles;
+import org.apache.iceberg.flink.maintenance.operator.ListMetadataFilesProcess;
+import org.apache.iceberg.flink.maintenance.operator.SkipOnError;
+import org.apache.iceberg.flink.maintenance.operator.TablePlanner;
+import org.apache.iceberg.flink.maintenance.operator.TableReader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/** Delete orphan files from the file system. */
+public class DeleteOrphanFiles {
+
+  private static final Schema FILE_PATH_SCHEMA = new 
Schema(DataFile.FILE_PATH);
+  private static final ScanContext FILE_PATH_SCAN_CONTEXT =
+      ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
+  private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+
+  @Internal
+  public static final OutputTag<Exception> ERROR_STREAM =
+      new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
+
+  static final String PLANNER_TASK_NAME = "Table Planner";
+  static final String READER_TASK_NAME = "Files Reader";
+  static final String FILESYSTEM_FILES_TASK_NAME = "Filesystem Files";
+  static final String METADATA_FILES_TASK_NAME = "List metadata Files";
+  static final String DELETE_FILES_TASK_NAME = "Delete File";
+  static final String AGGREGATOR_TASK_NAME = "Orphan Files Aggregator";
+  static final String FILTER_FILES_TASK_NAME = "Filter File";
+  static final String SKIP_ON_ERROR_TASK_NAME = "Skip On Error";
+
+  public static DeleteOrphanFiles.Builder builder() {
+    return new DeleteOrphanFiles.Builder();
+  }
+
+  private DeleteOrphanFiles() {
+    // Do not instantiate directly
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<DeleteOrphanFiles.Builder> {
+    private String location = null;
+    private Duration minAge = Duration.ofDays(3);
+    private int planningWorkerPoolSize = 10;
+    private int deleteBatchSize = 1000;
+    private boolean caseSensitive = false;
+    private int maxListingDepth = 3;
+    private int maxListingDirectSubDirs = 10;
+    private boolean usePrefixListing = false;
+    private Map<String, String> equalSchemes =
+        Maps.newHashMap(
+            ImmutableMap.of(
+                "s3n", "s3",
+                "s3a", "s3a"));
+    private final Map<String, String> equalAuthorities = Maps.newHashMap();
+    private org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode 
prefixMismatchMode =
+        org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode.ERROR;
+
+    /**
+     * The location to start the recursive listing the candidate files for 
removal. By default, the
+     * {@link Table#location()} is used.
+     *
+     * @param newLocation the task will scan
+     * @return for chained calls
+     */
+    public Builder location(String newLocation) {
+      this.location = newLocation;
+      return this;
+    }
+
+    /**
+     * Whether to use prefix listing when listing files from the file system.
+     *
+     * @param newUsePrefixListing true to enable prefix listing, false 
otherwise
+     * @return for chained calls
+     */
+    public Builder usePrefixListing(boolean newUsePrefixListing) {
+      this.usePrefixListing = newUsePrefixListing;
+      return this;
+    }
+
+    /**
+     * The maximum number of direct subdirectories to list in a single 
directory.
+     *
+     * @param newMaxListingDirectSubDirs the maximum number of direct 
sub-directories to list
+     * @return for chained calls
+     */
+    public Builder maxListingDirectSubDirs(int newMaxListingDirectSubDirs) {
+      this.maxListingDirectSubDirs = newMaxListingDirectSubDirs;
+      return this;
+    }
+
+    /**
+     * The maximum depth to recurse when listing files from the file system.
+     *
+     * @param newMaxListingDepth the maximum depth to recurse
+     * @return for chained calls
+     */
+    public Builder maxListingDepth(int newMaxListingDepth) {
+      this.maxListingDepth = newMaxListingDepth;
+      return this;
+    }
+
+    /**
+     * Action behavior when location prefixes (schemes/authorities) mismatch.
+     *
+     * @param newPrefixMismatchMode to action when mismatch
+     * @return for chained calls
+     */
+    public Builder prefixMismatchMode(
+        org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode 
newPrefixMismatchMode) {

Review Comment:
   This compiles for me:
   
   ```java
   import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
   ...
       private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
   ...
       public Builder prefixMismatchMode(PrefixMismatchMode 
newPrefixMismatchMode) {
         this.prefixMismatchMode = newPrefixMismatchMode;
         return this;
       }
   ```
   
   Then, from another class, this works as well:
   
   ```java
   import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
   ...
         
DeleteOrphanFiles.builder().prefixMismatchMode(PrefixMismatchMode.IGNORE);
   ```



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/OrphanFilesDetector.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.actions.FileURI;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A specialized co-process function that performs an anti-join between two 
streams of file URIs.
+ *
+ * <p>Emits every file that exists in the file system but is not referenced in 
the table metadata,
+ * which are considered orphan files. It also handles URI normalization using 
provided scheme and
+ * authority equivalence mappings.
+ */
+@Internal
+public class OrphanFilesDetector extends KeyedCoProcessFunction<String, 
String, String, String> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OrphanFilesDetector.class);
+
+  // Use MapState to dedupe the strings found in the table
+  private transient MapState<String, Boolean> foundInTable;
+  private transient ValueState<String> foundInFileSystem;
+  private transient ValueState<Boolean> hasUriError;
+  private final DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode;
+  private final Map<String, String> equalSchemes;
+  private final Map<String, String> equalAuthorities;
+
+  public OrphanFilesDetector(
+      DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode,
+      Map<String, String> equalSchemes,
+      Map<String, String> equalAuthorities) {
+    this.prefixMismatchMode = prefixMismatchMode;
+    this.equalSchemes = equalSchemes;
+    this.equalAuthorities = equalAuthorities;
+  }
+
+  @Override
+  public void open(OpenContext openContext) throws Exception {
+    super.open(openContext);
+    foundInTable =
+        getRuntimeContext()
+            .getMapState(
+                new MapStateDescriptor<>("antiJoinFoundInTable", Types.STRING, 
Types.BOOLEAN));
+    hasUriError =
+        getRuntimeContext().getState(new 
ValueStateDescriptor<>("antiJoinUriError", Types.BOOLEAN));
+    foundInFileSystem =
+        getRuntimeContext()
+            .getState(new ValueStateDescriptor<>("antiJoinFoundInFileSystem", 
Types.STRING));
+  }
+
+  @Override
+  public void processElement1(String value, Context context, Collector<String> 
collector)
+      throws Exception {
+    if (shouldSkipElement(value, context)) {
+      return;
+    }
+
+    if (!foundInTable.contains(value)) {
+      foundInTable.put(value, true);

Review Comment:
   Thanks! That makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to