dlmarion commented on code in PR #5353:
URL: https://github.com/apache/accumulo/pull/5353#discussion_r1967681791
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -28,6 +28,8 @@
import java.util.Objects;
import java.util.function.Predicate;
+import javax.print.DocFlavor.STRING;
Review Comment:
Is this used?
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -904,6 +911,8 @@ public enum Property {
TABLE_ONDEMAND_UNLOADER("tserver.ondemand.tablet.unloader",
"org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader",
PropertyType.CLASSNAME,
"The class that will be used to determine which on-demand Tablets to
unload.", "4.0.0"),
+ TABLE_MAX_MERGEABILITY_THRESHOLD("table.mergeability.threshold", ".25",
PropertyType.FRACTION,
+ "A tablet is mergeable until it reaches this percentage of the split
threshold.", "1.3.5"),
Review Comment:
Is there a reason the version is 1.3.5?
##########
server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.merge;
+
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.Fate.FateOperation;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
+import org.apache.accumulo.core.util.time.SteadyTime;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
+import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class FindMergeableRangeTask implements Runnable {
+
+ private static final Logger log =
LoggerFactory.getLogger(FindMergeableRangeTask.class);
+
+ private static final TabletMergeabilityFilter FILTER = new
TabletMergeabilityFilter();
+
+ private final Manager manager;
+
+ public FindMergeableRangeTask(Manager manager) {
+ this.manager = Objects.requireNonNull(manager);
+ log.debug("Creating FindMergeableRangeTask");
+ }
+
+ @Override
+ public void run() {
+ var context = manager.getContext();
+ Map<TableId,String> tables = context.getTableIdToNameMap();
+
+ log.debug("Starting FindMergeableRangeTask");
+
+ for (Entry<TableId,String> table : tables.entrySet()) {
+ TableId tableId = table.getKey();
+ String tableName = table.getValue();
+
+ long maxFileCount =
+
context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX);
+ long threshold =
+
context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+ double mergeabilityThreshold = context.getTableConfiguration(tableId)
+ .getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD);
+ long maxTotalSize = (long) (threshold * mergeabilityThreshold);
+
+ log.debug("Checking {} for tablets that can be merged", tableName);
+ log.debug("maxFileCount: {}, maxTotalSize:{}, splitThreshold:{},
mergeabilityThreshold:{}",
+ maxFileCount, maxTotalSize, threshold, mergeabilityThreshold);
+ try {
+ NamespaceId namespaceId = context.getNamespaceId(tableId);
+ var type = FateInstanceType.fromTableId(tableId);
+
+ try (var tablets = context.getAmple().readTablets().forTable(tableId)
+ .fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) {
+
+ final MergeableRange current =
+ new MergeableRange(manager.getSteadyTime(), maxFileCount,
maxTotalSize);
+
+ for (var tm : tablets) {
+ log.trace("Checking tablet {}, {}", tm.getExtent(),
tm.getTabletMergeability());
+ if (!current.add(tm)) {
+ submit(current, type, table, namespaceId);
+ current.resetAndAdd(tm);
+ }
+ }
+
+ submit(current, type, table, namespaceId);
+ }
+
+ } catch (Exception e) {
+ log.error("Failed to generate system merges for {}", tableName, e);
+ }
+ }
+
+ }
+
+ void submit(MergeableRange range, FateInstanceType type,
Entry<TableId,String> table,
+ NamespaceId namespaceId) {
+ if (range.tabletCount < 2) {
+ return;
+ }
+
+ log.debug("Table {} found {} tablets that can be merged for table",
table.getValue(),
+ range.tabletCount);
+
+ TableId tableId = table.getKey();
+ String tableName = table.getValue();
+
+ range.startRow =
Optional.ofNullable(range.startRow).map(Text::new).orElse(new Text(""));
+ range.endRow = Optional.ofNullable(range.endRow).map(Text::new).orElse(new
Text(""));
Review Comment:
This modifies the fields of the input variable object. Is that intended, or
is this just being done for the `toString` calls in the next set of lines.
Also, I think this could be done without the extra object creation:
```suggestion
range.startRow = (range.startRow == null) ? new Text("") : new
Text(range.startRow);
range.endRow = (range.endRow == null) ? new Text("") : new
Text(range.endRow);
```
##########
core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java:
##########
@@ -357,4 +358,12 @@ boolean testClassLoad(final String className, final String
asTypeName)
* @since 2.1.0
*/
InstanceId getInstanceId();
+
+ /**
+ * Return the current manager time
Review Comment:
What does this time represent? Is it how long the Manager has been up? In a
highly available Manager deployment, could this time go backwards if the active
Manager changes?
##########
server/manager/src/main/java/org/apache/accumulo/manager/Manager.java:
##########
@@ -1292,6 +1293,13 @@ boolean canSuspendTablets() {
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(() ->
ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
+ // TODO - create new threadpool?
Review Comment:
I assume that this cannot go into the TabletGroupWatcher because it needs to
look at more than one Tablet? I think using the general scheduled executor
threadpool should be fine. However, we may want to consider bumping the default
value of `GENERAL_THREADPOOL_SIZE` from `1` to something larger. Probably good
for a separate PR.
##########
server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.merge;
+
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
+import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.Fate.FateOperation;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
+import org.apache.accumulo.core.util.time.SteadyTime;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
+import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class FindMergeableRangeTask implements Runnable {
+
+ private static final Logger log =
LoggerFactory.getLogger(FindMergeableRangeTask.class);
+
+ private static final TabletMergeabilityFilter FILTER = new
TabletMergeabilityFilter();
+
+ private final Manager manager;
+
+ public FindMergeableRangeTask(Manager manager) {
+ this.manager = Objects.requireNonNull(manager);
+ log.debug("Creating FindMergeableRangeTask");
+ }
+
+ @Override
+ public void run() {
+ var context = manager.getContext();
+ Map<TableId,String> tables = context.getTableIdToNameMap();
+
+ log.debug("Starting FindMergeableRangeTask");
+
+ for (Entry<TableId,String> table : tables.entrySet()) {
+ TableId tableId = table.getKey();
+ String tableName = table.getValue();
+
+ long maxFileCount =
+
context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX);
+ long threshold =
+
context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
+ double mergeabilityThreshold = context.getTableConfiguration(tableId)
+ .getFraction(Property.TABLE_MAX_MERGEABILITY_THRESHOLD);
+ long maxTotalSize = (long) (threshold * mergeabilityThreshold);
+
+ log.debug("Checking {} for tablets that can be merged", tableName);
+ log.debug("maxFileCount: {}, maxTotalSize:{}, splitThreshold:{},
mergeabilityThreshold:{}",
+ maxFileCount, maxTotalSize, threshold, mergeabilityThreshold);
+ try {
+ NamespaceId namespaceId = context.getNamespaceId(tableId);
+ var type = FateInstanceType.fromTableId(tableId);
+
+ try (var tablets = context.getAmple().readTablets().forTable(tableId)
+ .fetch(PREV_ROW, FILES, MERGEABILITY).filter(FILTER).build()) {
+
+ final MergeableRange current =
+ new MergeableRange(manager.getSteadyTime(), maxFileCount,
maxTotalSize);
+
+ for (var tm : tablets) {
+ log.trace("Checking tablet {}, {}", tm.getExtent(),
tm.getTabletMergeability());
+ if (!current.add(tm)) {
+ submit(current, type, table, namespaceId);
+ current.resetAndAdd(tm);
+ }
+ }
+
+ submit(current, type, table, namespaceId);
+ }
+
+ } catch (Exception e) {
+ log.error("Failed to generate system merges for {}", tableName, e);
+ }
+ }
+
+ }
+
+ void submit(MergeableRange range, FateInstanceType type,
Entry<TableId,String> table,
+ NamespaceId namespaceId) {
+ if (range.tabletCount < 2) {
+ return;
+ }
+
+ log.debug("Table {} found {} tablets that can be merged for table",
table.getValue(),
+ range.tabletCount);
+
+ TableId tableId = table.getKey();
+ String tableName = table.getValue();
+
+ range.startRow =
Optional.ofNullable(range.startRow).map(Text::new).orElse(new Text(""));
+ range.endRow = Optional.ofNullable(range.endRow).map(Text::new).orElse(new
Text(""));
+
+ String startRowStr = StringUtils.defaultIfBlank(range.startRow.toString(),
"-inf");
+ String endRowStr = StringUtils.defaultIfBlank(range.endRow.toString(),
"+inf");
+ log.debug("FindMergeableRangeTask: Creating merge op: {} from startRow: {}
to endRow: {}",
+ tableId, startRowStr, endRowStr);
+ var fateId = manager.fate(type).startTransaction();
+ String goalMessage = TableOperation.MERGE + " Merge table " + tableName +
"(" + tableId
+ + ") splits from " + startRowStr + " to " + endRowStr;
+
+ manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateId,
+ new TraceRepo<>(new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId,
tableId,
+ range.startRow, range.endRow)),
+ true, goalMessage);
+ }
+
+ static class MergeableRange {
+ final SteadyTime currentTime;
+ final long maxFileCount;
+ final long maxTotalSize;
+
+ Text startRow;
+ Text endRow;
+ int tabletCount;
+ long totalFileCount;
+ long totalFileSize;
+
+ MergeableRange(SteadyTime currentTime, long maxFileCount, long
maxTotalSize) {
+ this.currentTime = currentTime;
+ this.maxFileCount = maxFileCount;
+ this.maxTotalSize = maxTotalSize;
+ }
+
+ boolean add(TabletMetadata tm) {
+ if (validate(tm)) {
+ tabletCount++;
+ log.trace("Adding tablet {} to MergeableRange", tm.getExtent());
+ if (tabletCount == 1) {
+ startRow = tm.getPrevEndRow();
+ }
+ endRow = tm.getEndRow();
+ totalFileCount += tm.getFiles().size();
+ totalFileSize += tm.getFileSize();
+ return true;
+ }
+ return false;
+ }
+
+ private boolean validate(TabletMetadata tm) {
+ if (tabletCount > 0) {
+ // If this is not the first tablet, then verify its prevEndRow matches
+ // the last endRow tracked, the server filter will skip tablets marked
as never
+ if (!tm.getPrevEndRow().equals(endRow)) {
Review Comment:
`tm.getPrevEndRow()` could be null? `endRow` could be null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]