keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402466931


##########
test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class FindCompactionTmpFilesIT extends SharedMiniClusterBase {

Review Comment:
   If possible it would be nice to verify the default behavior of calling main 
is not to delete.  I saw the util has `--delete` arg that defaults to false.



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -1097,6 +1097,11 @@ public enum Property {
   @Experimental
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo 
compactor server.", "2.1.0"),
+  COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
+      PropertyType.TIMEDURATION,
+      "Interval at which Compactors will check to see if the currently 
executing compaction"
+          + " should be cancelled.",

Review Comment:
   ```suggestion
             + " should be cancelled.  This checks for situations like was the 
tablet deleted (split and merge do this), was the table deleted, was a user 
compaction canceled, etc",
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,8 +1088,73 @@ void 
compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, 
ecid);
           }
+        } else {
+          // compactionFailed is called from the Compactor when either a 
compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. 
This block is
+          // entered when the conditional mutator above successfully deletes 
an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet 
directory
+          // that have a corresponding ecid in the name.
+
+          ecidsForTablet.clear();
+          compactions.entrySet().stream().filter(e -> 
e.getValue().compareTo(extent) == 0)
+              .map(Entry::getKey).forEach(ecidsForTablet::add);
+
+          if (!ecidsForTablet.isEmpty()) {
+            final TabletMetadata tm = ctx.getAmple().readTablet(extent, 
ColumnType.DIR);
+            if (tm != null) {
+              final Collection<Volume> vols = 
ctx.getVolumeManager().getVolumes();
+              for (Volume vol : vols) {
+                try {
+                  final String volPath =
+                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + 
Path.SEPARATOR
+                          + extent.tableId().canonical() + Path.SEPARATOR + 
tm.getDirName();
+                  final FileSystem fs = vol.getFileSystem();
+                  for (ExternalCompactionId ecid : ecidsForTablet) {
+                    final String fileSuffix = "_tmp_" + ecid.canonical();
+                    FileStatus[] files = fs.listStatus(new Path(volPath), 
(path) -> {
+                      return path.getName().endsWith(fileSuffix);
+                    });
+                    if (files.length > 0) {
+                      for (FileStatus file : files) {
+                        if (!fs.delete(file.getPath(), false)) {
+                          LOG.warn("Unable to delete ecid tmp file: {}: ", 
file.getPath());
+                        } else {
+                          LOG.debug("Deleted ecid tmp file: {}", 
file.getPath());
+                        }
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  LOG.error("Exception deleting compaction tmp files for 
tablet: {}", extent, e);
+                }
+              }
+            } else {
+              // TabletMetadata does not exist for the extent. This could be 
due to a merge or
+              // split operation. Use the utility to find tmp files at the 
table level
+              missingExtentTables.add(extent.tableId());
+            }
+          }
         }
       });
+
+      if (!missingExtentTables.isEmpty()) {
+        for (TableId tid : missingExtentTables) {
+          try {
+            final Set<Path> matches = 
FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());

Review Comment:
   This could be expensive in the case where lots of compactions failed and end 
up here, but in separate function calls.  Could make missingExtentTables a set 
at the compaction coordinator level and just add to the set in this function.  
Then have a new thread that periodically processes the set.  This could be a 
follow on issue. 



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {

Review Comment:
   This could be a follow on issue, may be nice to add this where it can be run 
from the accumulo command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to