This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 99df81c  ref #1998 - fixed possible race condition in test, fixed so 
that tserver will not get notification that compaction completed
99df81c is described below

commit 99df81c6067462d690bac035204aa998f3e6afd0
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Apr 28 15:25:56 2021 +0000

    ref #1998 - fixed possible race condition in test, fixed so that tserver 
will not get notification that compaction completed
---
 .../accumulo/coordinator/CompactionFinalizer.java  |  4 +-
 .../apache/accumulo/test/ExternalCompactionIT.java |  2 +-
 .../accumulo/test/TestCompactionCoordinator.java   |  4 +-
 .../TestCompactionCoordinatorForOfflineTable.java  | 76 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 5 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 348ab43..47b0d26 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -51,12 +51,12 @@ public class CompactionFinalizer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionFinalizer.class);
 
-  private final ServerContext context;
+  protected final ServerContext context;
   private final ExecutorService ntfyExecutor;
   private final ExecutorService backgroundExecutor;
   private final BlockingQueue<ExternalCompactionFinalState> 
pendingNotifications;
 
-  CompactionFinalizer(ServerContext context) {
+  protected CompactionFinalizer(ServerContext context) {
     this.context = context;
     this.pendingNotifications = new ArrayBlockingQueue<>(1000);
     // CBUG configure thread factory
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 018f453..ce583f9 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -373,7 +373,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       writeData(client, table1);
       writeData(client, table1);
 
-      cluster.exec(TestCompactionCoordinator.class);
+      cluster.exec(TestCompactionCoordinatorForOfflineTable.class);
 
       // Wait for coordinator to start
       ExternalCompactionMetrics metrics = null;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java
index f161643..541c36e 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java
@@ -159,9 +159,9 @@ public class TestCompactionCoordinator extends 
CompactionCoordinator
   }
 
   public static void main(String[] args) throws Exception {
-    try (TestCompactionCoordinator compactor =
+    try (TestCompactionCoordinator coordinator =
         new TestCompactionCoordinator(new ServerOpts(), args)) {
-      compactor.runServer();
+      coordinator.runServer();
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
new file mode 100644
index 0000000..ed1306a
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.List;
+
+import org.apache.accumulo.coordinator.CompactionFinalizer;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
+import 
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCompactionCoordinatorForOfflineTable extends 
TestCompactionCoordinator
+    implements 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface {
+
+  public static class NonNotifyingCompactionFinalizer extends 
CompactionFinalizer {
+
+    private static final Logger LOG =
+        LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class);
+
+    NonNotifyingCompactionFinalizer(ServerContext context) {
+      super(context);
+    }
+
+    public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, 
long fileSize,
+        long fileEntries) {
+
+      var ecfs = new ExternalCompactionFinalState(ecid, extent, 
FinalState.FINISHED, fileSize,
+          fileEntries);
+
+      // write metadata entry
+      LOG.info("Writing completed external compaction to metadata table: {}", 
ecfs);
+      context.getAmple().putExternalCompactionFinalStates(List.of(ecfs));
+
+      // queue RPC if queue is not full
+      LOG.info("Skipping tserver notification for completed external 
compaction: {}", ecfs);
+    }
+
+  }
+
+  protected TestCompactionCoordinatorForOfflineTable(ServerOpts opts, String[] 
args) {
+    super(opts, args);
+  }
+
+  @Override
+  protected CompactionFinalizer createCompactionFinalizer() {
+    return new NonNotifyingCompactionFinalizer(getContext());
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (TestCompactionCoordinatorForOfflineTable coordinator =
+        new TestCompactionCoordinatorForOfflineTable(new ServerOpts(), args)) {
+      coordinator.runServer();
+    }
+  }
+}

Reply via email to