keith-turner commented on code in PR #3915:
URL: https://github.com/apache/accumulo/pull/3915#discussion_r1391709590


##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -211,6 +269,9 @@ public enum Property {
       "Properties in this category affect the behavior of accumulo overall, 
but"
           + " do not have to be consistent throughout a cloud.",
       "1.3.5"),
+  GENERAL_COMPACTION_WARN_TIME("general.compaction.warn.time", "10m", 
PropertyType.TIMEDURATION,

Review Comment:
   Why not use the top level compaction prefix for this, like 
`compaction.warn.time`?



##########
core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java:
##########
@@ -184,6 +220,32 @@ public void init(InitParameters params) {
       tmpExec.add(new Executor(ceid, maxSize));
     }
 
+    values = "";
+    if (params.getOptions().containsKey("queues")) {
+      values = params.getOptions().get("queues");
+    }
+
+    if (!values.isBlank()) {
+      queueConfigs = GSON.get().fromJson(values, QueueConfig[].class);
+    } else {
+      // Generated a zero-length array to avoid a npe thrown by forEach
+      queueConfigs = new QueueConfig[0];
+    }
+
+    for (QueueConfig queueConfig : queueConfigs) {
+      Long maxSize = queueConfig.maxSize == null ? null
+          : ConfigurationTypeHelper.getFixedMemoryAsBytes(queueConfig.maxSize);
+
+      CompactionExecutorId ceid;
+      String queue = Objects.requireNonNull(queueConfig.name, "'name' must be 
specified");
+      ceid = params.getExecutorManager().getExternalExecutor(queue);
+      tmpExec.add(new Executor(ceid, maxSize));
+    }
+
+    if (tmpExec.size() < 1) {
+      throw new IllegalStateException("No defined executors for this planner");

Review Comment:
   ```suggestion
         throw new IllegalStateException("No defined executors or queues for 
this planner");
   ```



##########
core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java:
##########
@@ -239,6 +240,20 @@ public void testMaxSize() throws Exception {
     assertEquals(CompactionExecutorIdImpl.externalId("large"), 
job.getExecutor());
   }
 
+  @Test
+  public void testQueueCreation() {
+    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+    Configuration conf = EasyMock.createMock(Configuration.class);
+    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.replay(conf, senv);
+
+    String queues = "[{\"name\": \"midsize\", 
\"maxSize\":\"32M\"},{\"name\":\"small\"}]";
+    planner.init(getInitParamQueues(senv, queues));

Review Comment:
   Can use the planner and make the expected queues are returned
   
   ```suggestion
       String queues = "[{\"name\": \"small\", 
\"maxSize\":\"32M\"},{\"name\":\"midsize\"}]";
       planner.init(getInitParamQueues(senv, queues));
   
       var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "1M");
       var params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
       var plan = planner.makePlan(params);
   
       var job = getOnlyElement(plan.getJobs());
       assertEquals(all, job.getFiles());
       assertEquals(CompactionExecutorIdImpl.externalId("small"), 
job.getExecutor());
   
   
       all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M");
       params = createPlanningParams(all, all, Set.of(), 2, 
CompactionKind.SYSTEM);
       plan = planner.makePlan(params);
   
       job = getOnlyElement(plan.getJobs());
       assertEquals(all, job.getFiles());
       assertEquals(CompactionExecutorIdImpl.externalId("midsize"), 
job.getExecutor());
   ```



##########
core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java:
##########
@@ -184,6 +220,32 @@ public void init(InitParameters params) {
       tmpExec.add(new Executor(ceid, maxSize));
     }
 
+    values = "";
+    if (params.getOptions().containsKey("queues")) {
+      values = params.getOptions().get("queues");
+    }
+
+    if (!values.isBlank()) {
+      queueConfigs = GSON.get().fromJson(values, QueueConfig[].class);
+    } else {
+      // Generated a zero-length array to avoid a npe thrown by forEach
+      queueConfigs = new QueueConfig[0];
+    }
+
+    for (QueueConfig queueConfig : queueConfigs) {
+      Long maxSize = queueConfig.maxSize == null ? null
+          : ConfigurationTypeHelper.getFixedMemoryAsBytes(queueConfig.maxSize);
+
+      CompactionExecutorId ceid;
+      String queue = Objects.requireNonNull(queueConfig.name, "'name' must be 
specified");
+      ceid = params.getExecutorManager().getExternalExecutor(queue);

Review Comment:
   Not a change for this PR. Wondering if new method should be introduced that 
are only for queues and these executor methods deprecated.



##########
core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("deprecation")
+public class CompactionServicesConfigTest {
+
+  private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+  private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
+
+  @Test
+  public void testCompactionProps() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    conf.set("compaction.major.service.default.planner", 
DefaultCompactionPlanner.class.getName());
+    conf.set("compaction.major.service.default.planner.opts.maxOpen", "10");
+    conf.set("compaction.major.service.default.planner.opts.executors",
+        
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+
+    conf.set(oldPrefix.getKey() + "default.planner.opts.ignoredProp", "1");
+    conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1");
+
+    var compactionConfig = new CompactionServicesConfig(conf);
+    
assertTrue(compactionConfig.getOptions().get("default").containsKey("validProp"));
+    assertEquals("1", 
compactionConfig.getOptions().get("default").get("validProp"));
+    
assertNull(compactionConfig.getOptions().get("default").get("ignoredProp"));
+  }
+
+  @Test
+  public void testDuplicateCompactionPlannerDefs() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    String planner = DefaultCompactionPlanner.class.getName();
+    String oldPlanner = "OldPlanner";
+
+    conf.set(newPrefix.getKey() + "default.planner", planner);
+    conf.set(oldPrefix.getKey() + "default.planner", oldPlanner);
+
+    conf.set(oldPrefix.getKey() + "old.planner", oldPlanner);
+
+    var compactionConfig = new CompactionServicesConfig(conf);
+
+    assertTrue(compactionConfig.getPlanners().containsKey("default"));
+    assertEquals(planner, compactionConfig.getPlanners().get("default"));
+
+    assertTrue(compactionConfig.getPlanners().containsKey("old"));
+    assertEquals(oldPlanner, compactionConfig.getPlanners().get("old"));

Review Comment:
   Could do the following.  This also defends against unexpected extra stuff 
being in the map, could also check the map size for that.
   
   ```suggestion
   assertEquals(Map.of("default",planner,"old",oldPlanner), 
compactionConfig.getPlanners());
   ```



##########
core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java:
##########
@@ -60,7 +60,7 @@ public Map<String,String> getOptions() {
 
   @Override
   public String getFullyQualifiedOption(String key) {
-    return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + 
".opts." + key;
+    return Property.COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." 
+ key;

Review Comment:
   This should probably adapt to the prefix that was used to create the planner.



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -44,6 +44,64 @@
 import com.google.common.base.Preconditions;
 
 public enum Property {
+  COMPACTION_SERVICE_PREFIX("compaction.major.service.", null, 
PropertyType.PREFIX,

Review Comment:
   Could possibly drop major in the property name and document that compaction 
services process major compaction.  
   
   ```suggestion
     COMPACTION_SERVICE_PREFIX("compaction.service.", null, PropertyType.PREFIX,
   ```



##########
core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java:
##########
@@ -26,55 +26,107 @@
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
 /**
  * This class serves to configure compaction services from an {@link 
AccumuloConfiguration} object.
  *
  * Specifically, compaction service properties (those prefixed by 
"tserver.compaction.major
- * .service") are used.
+ * .service" or "compaction.major.service") are used.
  */
+@SuppressWarnings("deprecation")
 public class CompactionServicesConfig {
 
+  private static final Logger log = 
LoggerFactory.getLogger(CompactionServicesConfig.class);
   private final Map<String,String> planners = new HashMap<>();
   private final Map<String,Long> rateLimits = new HashMap<>();
   private final Map<String,Map<String,String>> options = new HashMap<>();
+  private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+  private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
   long defaultRateLimit;
 
   public static final CompactionServiceId DEFAULT_SERVICE = 
CompactionServiceId.of("default");
 
   @SuppressWarnings("removal")
   private long getDefaultThroughput() {
     return ConfigurationTypeHelper
-        
.getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
+        
.getMemoryAsBytes(Property.COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
   }
 
-  private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
-    return 
aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
+  private Map<String,Map<String,String>> 
getConfiguration(AccumuloConfiguration aconf) {
+    Map<String,Map<String,String>> properties = new HashMap<>();
+    properties.put(newPrefix.getKey(), 
aconf.getAllPropertiesWithPrefixStripped(newPrefix));
+
+    Map<String,String> oldProps = new HashMap<>();
+    for (Map.Entry<String,String> entry : 
aconf.getAllPropertiesWithPrefixStripped(oldPrefix)
+        .entrySet()) {
+      // Discard duplicate property definitions
+      if (properties.get(newPrefix.getKey()).containsKey(entry.getKey())) {
+        log.warn("Duplicate property exists for compaction planner: '{}'. "
+            + "Using the value of property '{}'", entry.getKey(), newPrefix + 
entry.getKey());
+      } else {
+        oldProps.put(entry.getKey(), entry.getValue());
+      }
+    }
+    properties.put(oldPrefix.getKey(), oldProps);

Review Comment:
   Wondering if the service de-duplication was moved to this method if it would 
simplify the rest of the code.  Maybe the rest of the code would not need to 
worry about the prefixes, but not sure.
   
   Ignore the suggestion if you think its not useful.  I only made the code 
suggestion to explain what I was thinking with service de-duplication.
   
   ```suggestion
     private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
   
       var newProps =  aconf.getAllPropertiesWithPrefixStripped(newPrefix);
       Map<String,String> properties = new HashMap<>(newProps);
       // get all of the services under the new prefix
       var newServices = 
newProps.keySet().stream().map(prop->prop.split("\\.")[0]).collect(Collectors.toSet());
   
       for (Map.Entry<String,String> entry : 
aconf.getAllPropertiesWithPrefixStripped(oldPrefix)
           .entrySet()) {
         // Discard duplicate service definitions
         var service = entry.getKey().split("\\.")[0];
         if (newServices.contains(service)) {
           log.warn("Duplicate compaction service '{}' definition exists. 
Ignoring property : '{}'", service, entry.getKey());
         } else {
           properties.put(entry.getKey(), entry.getValue());
         }
       }
   ```



##########
core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("deprecation")
+public class CompactionServicesConfigTest {
+
+  private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+  private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
+
+  @Test
+  public void testCompactionProps() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    conf.set("compaction.major.service.default.planner", 
DefaultCompactionPlanner.class.getName());
+    conf.set("compaction.major.service.default.planner.opts.maxOpen", "10");
+    conf.set("compaction.major.service.default.planner.opts.executors",
+        
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+
+    conf.set(oldPrefix.getKey() + "default.planner.opts.ignoredProp", "1");
+    conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1");
+
+    var compactionConfig = new CompactionServicesConfig(conf);
+    
assertTrue(compactionConfig.getOptions().get("default").containsKey("validProp"));
+    assertEquals("1", 
compactionConfig.getOptions().get("default").get("validProp"));
+    
assertNull(compactionConfig.getOptions().get("default").get("ignoredProp"));
+  }
+
+  @Test
+  public void testDuplicateCompactionPlannerDefs() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    String planner = DefaultCompactionPlanner.class.getName();
+    String oldPlanner = "OldPlanner";
+
+    conf.set(newPrefix.getKey() + "default.planner", planner);
+    conf.set(oldPrefix.getKey() + "default.planner", oldPlanner);
+
+    conf.set(oldPrefix.getKey() + "old.planner", oldPlanner);
+
+    var compactionConfig = new CompactionServicesConfig(conf);
+
+    assertTrue(compactionConfig.getPlanners().containsKey("default"));
+    assertEquals(planner, compactionConfig.getPlanners().get("default"));
+
+    assertTrue(compactionConfig.getPlanners().containsKey("old"));
+    assertEquals(oldPlanner, compactionConfig.getPlanners().get("old"));
+  }
+
+  @Test
+  public void testCompactionPlannerOldDef() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    conf.set(oldPrefix.getKey() + "cs1.planner", 
DefaultCompactionPlanner.class.getName());
+    conf.set(oldPrefix.getKey() + "cs1.planner.opts.maxOpen", "10");
+    conf.set(oldPrefix.getKey() + "cs1.planner.opts.executors",
+        
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+    conf.set(oldPrefix.getKey() + "cs1.planner.opts.foo", "1");
+    conf.set(newPrefix.getKey() + "cs1.planner.opts.bar", "2");
+
+    var compactionConfig = new CompactionServicesConfig(conf);
+    assertTrue(compactionConfig.getOptions().get("cs1").containsKey("foo"));
+    assertEquals("1", compactionConfig.getOptions().get("cs1").get("foo"));
+
+    assertTrue(compactionConfig.getOptions().get("cs1").containsKey("bar"));
+    assertEquals("2", compactionConfig.getOptions().get("cs1").get("bar"));

Review Comment:
   I was thinking service config would only be pulled from a single prefix, not 
a mix of both.  Thinking in this case that since `newPrefix+"cs1"` exists that 
everything under `oldPrefix+"cs1"` should be ignored.  Does not have to be done 
this way, its just what I thought would be less confusing because if a user 
only looks at config under one prefix then they will know its everything.  When 
mixed if the config is sorted, then all the config for a service would not sort 
together (like in the shell).



##########
core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java:
##########
@@ -302,6 +317,60 @@ public void testErrorExternalNoQueue() {
     assertTrue(e.getMessage().contains("queue"), "Error message didn't contain 
queue");
   }
 
+  /**
+   * Tests queue with missing name throws error
+   */
+  @Test
+  public void testErrorQueueNoName() {
+    DefaultCompactionPlanner planner = new DefaultCompactionPlanner();
+    Configuration conf = EasyMock.createMock(Configuration.class);
+    
EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes();
+
+    ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
+    EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.replay(conf, senv);
+
+    String queues =
+        "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, 
{\"type\":\"internal\", \"queue\":\"broken\"}]";

Review Comment:
   This is missing a `name` field, but has a `type` field.  Is that 
intentional?  Could it fail because the `type` field is present instead of the 
`name` field missing?



##########
core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("deprecation")
+public class CompactionServicesConfigTest {
+
+  private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX;
+  private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX;
+
+  @Test
+  public void testCompactionProps() {
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    conf.set("compaction.major.service.default.planner", 
DefaultCompactionPlanner.class.getName());
+    conf.set("compaction.major.service.default.planner.opts.maxOpen", "10");
+    conf.set("compaction.major.service.default.planner.opts.executors",
+        
"[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]");
+
+    conf.set(oldPrefix.getKey() + "default.planner.opts.ignoredProp", "1");
+    conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1");

Review Comment:
   Maybe this should be in a separate test method, not sure.  Could also test 
the following case where the old prefix sets the same prop w/ a diff value.
   
   ```suggestion
       conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1");
       conf.set(oldPrefix.getKey() + "default.planner.opts.validProp", "a");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to