This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new a3ec20e41a Globally Unique FATE Transaction Ids - Part 4 (#4258) a3ec20e41a is described below commit a3ec20e41a8058106d829f73346f65f1734d1860 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Fri Feb 16 14:24:58 2024 -0500 Globally Unique FATE Transaction Ids - Part 4 (#4258) This addresses several previously deferred changes for issue #4044. Changes: - ZooReservation now uses FateId (used in Utils) - TabletOperationId now uses FateId - TExternalCompactionJob now uses FateId - VolumeManager and VolumeManagerImpl now use FateId - Utils.getLock() lockData now uses the full FateId - TabletRefresher now uses FateId - Classes which used the above classes updated - Several test changes to reflect new changes - Deferred a couple of changes (in Compactor and CompactionCoordinator) (need pull/4247 merged first) --- .../java/org/apache/accumulo/core/fate/FateId.java | 23 +++- .../core/fate/zookeeper/ZooReservation.java | 24 ++-- .../core/metadata/schema/TabletOperationId.java | 12 +- .../thrift/TExternalCompactionJob.java | 126 ++++++++++++--------- core/src/main/thrift/tabletserver.thrift | 2 +- .../core/metadata/schema/TabletMetadataTest.java | 3 +- .../server/constraints/MetadataConstraints.java | 2 +- .../apache/accumulo/server/fs/VolumeManager.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 11 +- .../constraints/MetadataConstraintsTest.java | 2 +- .../org/apache/accumulo/compactor/Compactor.java | 13 +-- .../coordinator/CompactionCoordinator.java | 3 +- .../apache/accumulo/manager/tableOps/Utils.java | 8 +- .../manager/tableOps/bulkVer2/BulkImportMove.java | 3 +- .../manager/tableOps/bulkVer2/RefreshTablets.java | 3 +- .../manager/tableOps/bulkVer2/TabletRefresher.java | 10 +- .../manager/tableOps/compact/CompactionDriver.java | 5 +- .../manager/tableOps/compact/RefreshTablets.java | 5 +- .../manager/tableOps/delete/ReserveTablets.java | 3 +- .../manager/tableOps/merge/DeleteRows.java | 3 +- .../manager/tableOps/merge/DeleteTablets.java | 3 +- .../manager/tableOps/merge/FinishTableRangeOp.java | 3 +- .../manager/tableOps/merge/MergeTablets.java | 3 +- .../manager/tableOps/merge/ReserveTablets.java | 3 +- .../manager/tableOps/split/DeleteOperationIds.java | 3 +- .../accumulo/manager/tableOps/split/PreSplit.java | 6 +- .../manager/tableOps/split/UpdateTablets.java | 3 +- .../tableOps/tableImport/MoveExportedFiles.java | 3 +- .../compaction/CompactionCoordinatorTest.java | 10 +- .../org/apache/accumulo/test/ScanServerIT.java | 6 +- .../test/functional/AmpleConditionalWriterIT.java | 18 ++- .../test/functional/ManagerAssignmentIT.java | 10 +- .../functional/TabletManagementIteratorIT.java | 6 +- 33 files changed, 191 insertions(+), 152 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 5be742d2fd..8907c6879c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -24,6 +24,7 @@ import java.util.stream.Stream; import org.apache.accumulo.core.data.AbstractId; import org.apache.accumulo.core.manager.thrift.TFateId; +import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.util.FastFormat; /** @@ -107,7 +108,7 @@ public class FateId extends AbstractId<FateId> { * @param fateIdStr the string representation of the FateId * @return true if the string is a valid FateId, false otherwise */ - public static boolean isFormattedTid(String fateIdStr) { + public static boolean isFateId(String fateIdStr) { return FATEID_PATTERN.matcher(fateIdStr).matches(); } @@ -133,6 +134,26 @@ public class FateId extends AbstractId<FateId> { return new FateId(PREFIX + type + ":" + formatTid(tid)); } + /** + * + * @return the TFateId equivalent of the FateId + */ + public TFateId toThrift() { + TFateInstanceType thriftType; + FateInstanceType type = getType(); + switch (type) { + case USER: + thriftType = TFateInstanceType.USER; + break; + case META: + thriftType = TFateInstanceType.META; + break; + default: + throw new IllegalArgumentException("Invalid FateInstanceType: " + type); + } + return new TFateId(thriftType, getTid()); + } + /** * Returns the hex string equivalent of the tid */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java index 33714b2581..8c6a918301 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java @@ -20,6 +20,7 @@ package org.apache.accumulo.core.fate.zookeeper; import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.zookeeper.KeeperException; @@ -29,15 +30,14 @@ import org.slf4j.LoggerFactory; public class ZooReservation { - public static boolean attempt(ZooReaderWriter zk, String path, String reservationID, - String debugInfo) throws KeeperException, InterruptedException { - if (reservationID.contains(":")) { - throw new IllegalArgumentException(); - } + private static final String DELIMITER = "-"; + + public static boolean attempt(ZooReaderWriter zk, String path, FateId fateId, String debugInfo) + throws KeeperException, InterruptedException { while (true) { try { - zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8), + zk.putPersistentData(path, (fateId.canonical() + DELIMITER + debugInfo).getBytes(UTF_8), NodeExistsPolicy.FAIL); return true; } catch (NodeExistsException nee) { @@ -48,15 +48,15 @@ public class ZooReservation { continue; } - String idInZoo = new String(zooData, UTF_8).split(":")[0]; + FateId idInZoo = FateId.from(new String(zooData, UTF_8).split(DELIMITER)[0]); - return idInZoo.equals(reservationID); + return idInZoo.equals(fateId); } } } - public static void release(ZooReaderWriter zk, String path, String reservationID) + public static void release(ZooReaderWriter zk, String path, FateId fateId) throws KeeperException, InterruptedException { byte[] zooData; @@ -69,11 +69,11 @@ public class ZooReservation { } String zooDataStr = new String(zooData, UTF_8); - String idInZoo = zooDataStr.split(":")[0]; + FateId idInZoo = FateId.from(zooDataStr.split(DELIMITER)[0]); - if (!idInZoo.equals(reservationID)) { + if (!idInZoo.equals(fateId)) { throw new IllegalStateException("Tried to release reservation " + path - + " with data mismatch " + reservationID + " " + zooDataStr); + + " with data mismatch " + fateId + " " + zooDataStr); } zk.recursiveDelete(path, NodeMissingPolicy.SKIP); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java index 49b7cf169a..8da831eacd 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.metadata.schema; import org.apache.accumulo.core.data.AbstractId; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import com.google.common.base.Preconditions; @@ -32,7 +32,7 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { private static final long serialVersionUID = 1L; public static String validate(String opid) { - var fields = opid.split(":"); + var fields = opid.split(":", 2); Preconditions.checkArgument(fields.length == 2, "Malformed operation id %s", opid); try { TabletOperationType.valueOf(fields[0]); @@ -40,7 +40,7 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { throw new IllegalArgumentException("Malformed operation id " + opid, e); } - if (!FateTxId.isFormatedTid(fields[1])) { + if (!FateId.isFateId(fields[1])) { throw new IllegalArgumentException("Malformed operation id " + opid); } @@ -52,7 +52,7 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { } public TabletOperationType getType() { - var fields = canonical().split(":"); + var fields = canonical().split(":", 2); Preconditions.checkState(fields.length == 2); return TabletOperationType.valueOf(fields[0]); } @@ -61,7 +61,7 @@ public class TabletOperationId extends AbstractId<TabletOperationId> { return new TabletOperationId(validate(opid)); } - public static TabletOperationId from(TabletOperationType type, long txid) { - return new TabletOperationId(type + ":" + FateTxId.formatTid(txid)); + public static TabletOperationId from(TabletOperationType type, FateId fateId) { + return new TabletOperationId(type + ":" + fateId); } } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index 6a0523c51b..7a3fdf46d1 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -35,7 +35,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)6); private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)7); - private static final org.apache.thrift.protocol.TField FATE_TX_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fateTxId", org.apache.thrift.protocol.TType.I64, (short)8); + private static final org.apache.thrift.protocol.TField FATE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fateId", org.apache.thrift.protocol.TType.STRUCT, (short)8); private static final org.apache.thrift.protocol.TField OVERRIDES_FIELD_DESC = new org.apache.thrift.protocol.TField("overrides", org.apache.thrift.protocol.TType.MAP, (short)9); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TExternalCompactionJobStandardSchemeFactory(); @@ -48,7 +48,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required public boolean propagateDeletes; // required public @org.apache.thrift.annotation.Nullable TCompactionKind kind; // required - public long fateTxId; // required + public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.manager.thrift.TFateId fateId; // required public @org.apache.thrift.annotation.Nullable java.util.Map<java.lang.String,java.lang.String> overrides; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -60,7 +60,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal OUTPUT_FILE((short)5, "outputFile"), PROPAGATE_DELETES((short)6, "propagateDeletes"), KIND((short)7, "kind"), - FATE_TX_ID((short)8, "fateTxId"), + FATE_ID((short)8, "fateId"), OVERRIDES((short)9, "overrides"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -91,8 +91,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return PROPAGATE_DELETES; case 7: // KIND return KIND; - case 8: // FATE_TX_ID - return FATE_TX_ID; + case 8: // FATE_ID + return FATE_ID; case 9: // OVERRIDES return OVERRIDES; default: @@ -139,7 +139,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal // isset id assignments private static final int __PROPAGATEDELETES_ISSET_ID = 0; - private static final int __FATETXID_ISSET_ID = 1; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -159,8 +158,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM , "TCompactionKind"))); - tmpMap.put(_Fields.FATE_TX_ID, new org.apache.thrift.meta_data.FieldMetaData("fateTxId", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FATE_ID, new org.apache.thrift.meta_data.FieldMetaData("fateId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.manager.thrift.TFateId.class))); tmpMap.put(_Fields.OVERRIDES, new org.apache.thrift.meta_data.FieldMetaData("overrides", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), @@ -180,7 +179,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal java.lang.String outputFile, boolean propagateDeletes, TCompactionKind kind, - long fateTxId, + org.apache.accumulo.core.manager.thrift.TFateId fateId, java.util.Map<java.lang.String,java.lang.String> overrides) { this(); @@ -192,8 +191,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal this.propagateDeletes = propagateDeletes; setPropagateDeletesIsSet(true); this.kind = kind; - this.fateTxId = fateTxId; - setFateTxIdIsSet(true); + this.fateId = fateId; this.overrides = overrides; } @@ -225,7 +223,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (other.isSetKind()) { this.kind = other.kind; } - this.fateTxId = other.fateTxId; + if (other.isSetFateId()) { + this.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(other.fateId); + } if (other.isSetOverrides()) { java.util.Map<java.lang.String,java.lang.String> __this__overrides = new java.util.HashMap<java.lang.String,java.lang.String>(other.overrides); this.overrides = __this__overrides; @@ -247,8 +247,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal setPropagateDeletesIsSet(false); this.propagateDeletes = false; this.kind = null; - setFateTxIdIsSet(false); - this.fateTxId = 0; + this.fateId = null; this.overrides = null; } @@ -441,27 +440,29 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } } - public long getFateTxId() { - return this.fateTxId; + @org.apache.thrift.annotation.Nullable + public org.apache.accumulo.core.manager.thrift.TFateId getFateId() { + return this.fateId; } - public TExternalCompactionJob setFateTxId(long fateTxId) { - this.fateTxId = fateTxId; - setFateTxIdIsSet(true); + public TExternalCompactionJob setFateId(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.manager.thrift.TFateId fateId) { + this.fateId = fateId; return this; } - public void unsetFateTxId() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __FATETXID_ISSET_ID); + public void unsetFateId() { + this.fateId = null; } - /** Returns true if field fateTxId is set (has been assigned a value) and false otherwise */ - public boolean isSetFateTxId() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __FATETXID_ISSET_ID); + /** Returns true if field fateId is set (has been assigned a value) and false otherwise */ + public boolean isSetFateId() { + return this.fateId != null; } - public void setFateTxIdIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __FATETXID_ISSET_ID, value); + public void setFateIdIsSet(boolean value) { + if (!value) { + this.fateId = null; + } } public int getOverridesSize() { @@ -559,11 +560,11 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } break; - case FATE_TX_ID: + case FATE_ID: if (value == null) { - unsetFateTxId(); + unsetFateId(); } else { - setFateTxId((java.lang.Long)value); + setFateId((org.apache.accumulo.core.manager.thrift.TFateId)value); } break; @@ -603,8 +604,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal case KIND: return getKind(); - case FATE_TX_ID: - return getFateTxId(); + case FATE_ID: + return getFateId(); case OVERRIDES: return getOverrides(); @@ -635,8 +636,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return isSetPropagateDeletes(); case KIND: return isSetKind(); - case FATE_TX_ID: - return isSetFateTxId(); + case FATE_ID: + return isSetFateId(); case OVERRIDES: return isSetOverrides(); } @@ -719,12 +720,12 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return false; } - boolean this_present_fateTxId = true; - boolean that_present_fateTxId = true; - if (this_present_fateTxId || that_present_fateTxId) { - if (!(this_present_fateTxId && that_present_fateTxId)) + boolean this_present_fateId = true && this.isSetFateId(); + boolean that_present_fateId = true && that.isSetFateId(); + if (this_present_fateId || that_present_fateId) { + if (!(this_present_fateId && that_present_fateId)) return false; - if (this.fateTxId != that.fateTxId) + if (!this.fateId.equals(that.fateId)) return false; } @@ -770,7 +771,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (isSetKind()) hashCode = hashCode * 8191 + kind.getValue(); - hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(fateTxId); + hashCode = hashCode * 8191 + ((isSetFateId()) ? 131071 : 524287); + if (isSetFateId()) + hashCode = hashCode * 8191 + fateId.hashCode(); hashCode = hashCode * 8191 + ((isSetOverrides()) ? 131071 : 524287); if (isSetOverrides()) @@ -857,12 +860,12 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return lastComparison; } } - lastComparison = java.lang.Boolean.compare(isSetFateTxId(), other.isSetFateTxId()); + lastComparison = java.lang.Boolean.compare(isSetFateId(), other.isSetFateId()); if (lastComparison != 0) { return lastComparison; } - if (isSetFateTxId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fateTxId, other.fateTxId); + if (isSetFateId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fateId, other.fateId); if (lastComparison != 0) { return lastComparison; } @@ -953,8 +956,12 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } first = false; if (!first) sb.append(", "); - sb.append("fateTxId:"); - sb.append(this.fateTxId); + sb.append("fateId:"); + if (this.fateId == null) { + sb.append("null"); + } else { + sb.append(this.fateId); + } first = false; if (!first) sb.append(", "); sb.append("overrides:"); @@ -977,6 +984,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (iteratorSettings != null) { iteratorSettings.validate(); } + if (fateId != null) { + fateId.validate(); + } } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -1086,10 +1096,11 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 8: // FATE_TX_ID - if (schemeField.type == org.apache.thrift.protocol.TType.I64) { - struct.fateTxId = iprot.readI64(); - struct.setFateTxIdIsSet(true); + case 8: // FATE_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.fateId.read(iprot); + struct.setFateIdIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1170,9 +1181,11 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal oprot.writeI32(struct.kind.getValue()); oprot.writeFieldEnd(); } - oprot.writeFieldBegin(FATE_TX_ID_FIELD_DESC); - oprot.writeI64(struct.fateTxId); - oprot.writeFieldEnd(); + if (struct.fateId != null) { + oprot.writeFieldBegin(FATE_ID_FIELD_DESC); + struct.fateId.write(oprot); + oprot.writeFieldEnd(); + } if (struct.overrides != null) { oprot.writeFieldBegin(OVERRIDES_FIELD_DESC); { @@ -1226,7 +1239,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (struct.isSetKind()) { optionals.set(6); } - if (struct.isSetFateTxId()) { + if (struct.isSetFateId()) { optionals.set(7); } if (struct.isSetOverrides()) { @@ -1260,8 +1273,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (struct.isSetKind()) { oprot.writeI32(struct.kind.getValue()); } - if (struct.isSetFateTxId()) { - oprot.writeI64(struct.fateTxId); + if (struct.isSetFateId()) { + struct.fateId.write(oprot); } if (struct.isSetOverrides()) { { @@ -1320,8 +1333,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal struct.setKindIsSet(true); } if (incoming.get(7)) { - struct.fateTxId = iprot.readI64(); - struct.setFateTxIdIsSet(true); + struct.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.fateId.read(iprot); + struct.setFateIdIsSet(true); } if (incoming.get(8)) { { diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index e473b4ad81..231e8f5809 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -111,7 +111,7 @@ struct TExternalCompactionJob { 5:string outputFile 6:bool propagateDeletes 7:TCompactionKind kind - 8:i64 fateTxId + 8:manager.TFateId fateId 9:map<string, string> overrides } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index e3cfc83fa2..36f2ea06e1 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -373,7 +373,8 @@ public class TabletMetadataTest { assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); - TabletOperationId opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, 55); + TabletOperationId opid1 = + TabletOperationId.from(TabletOperationType.SPLITTING, FateId.from(type, 55)); TabletMetadata tm2 = TabletMetadata.builder(extent).putOperation(opid1).build(LOCATION); assertEquals(extent, tm2.getExtent()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index c8d6f9dba6..5851e7224e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -267,7 +267,7 @@ public class MetadataConstraints implements Constraint { violations = addViolation(violations, 11); } } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { - if (!FateId.isFormattedTid(columnQualifier.toString())) { + if (!FateId.isFateId(columnQualifier.toString())) { violations = addViolation(violations, 13); } } else if (columnFamily.equals(BulkFileColumnFamily.NAME)) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 018d473a76..bac4d3d4cc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; @@ -167,8 +168,8 @@ public interface VolumeManager extends AutoCloseable { * This operation should be idempotent to allow calling multiple times in the case of a partial * completion. */ - void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException; + void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName, FateId fateId) + throws IOException; // forward to the appropriate FileSystem object boolean moveToTrash(Path sourcePath) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b47dedabe0..1c1e7a4fd5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -44,6 +44,7 @@ import java.util.stream.Stream; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.spi.fs.VolumeChooser; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -321,7 +322,7 @@ public class VolumeManagerImpl implements VolumeManager { @Override public void bulkRename(Map<Path,Path> oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException { + FateId fateId) throws IOException { List<Future<Void>> results = new ArrayList<>(); ExecutorService workerPool = ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, poolName, false); @@ -337,14 +338,14 @@ public class VolumeManagerImpl implements VolumeManager { } log.debug( "Ignoring rename exception for {} because destination already exists. orig: {} new: {}", - transactionId, oldPath, newPath, e); + fateId, oldPath, newPath, e); success = true; } if (!success && (!exists(newPath) || exists(oldPath))) { - throw new IOException("Rename operation " + transactionId + " returned false. orig: " - + oldPath + " new: " + newPath); + throw new IOException("Rename operation " + fateId + " returned false. orig: " + oldPath + + " new: " + newPath); } else if (log.isTraceEnabled()) { - log.trace("{} moved {} to {}", transactionId, oldPath, newPath); + log.trace("{} moved {} to {}", fateId, oldPath, newPath); } return null; }))); diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index a738e2fc3c..5360a54e95 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -462,7 +462,7 @@ public class MetadataConstraintsTest { assertViolation(mc, m, (short) 9); m = new Mutation(new Text("0;foo")); - ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE[123abc]")); + ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE:META:123abc")); violations = mc.check(createEnv(), m); assertNull(violations); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index c626fcb24d..1a1c2ece58 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -63,8 +63,6 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -213,17 +211,12 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac if (job.getKind() == TCompactionKind.USER) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044: TExternalCompactionJob.getFateTxId should be - // changed to - // TExternalCompactionJob.getFateId and return the FateId - FateInstanceType type = - FateInstanceType.fromTableId(KeyExtent.fromThrift(job.getExtent()).tableId()); - FateId fateId = FateId.from(type, job.getFateTxId()); - var cconf = CompactionConfigStorage.getConfig(getContext(), fateId); + var cconf = + CompactionConfigStorage.getConfig(getContext(), FateId.fromThrift(job.getFateId())); if (cconf == null) { LOG.info("Cancelling compaction {} for user compaction that no longer exists {} {}", - ecid, FateTxId.formatTid(job.getFateTxId()), extent); + ecid, FateId.fromThrift(job.getFateId()), extent); JOB_HOLDER.cancel(job.getExternalCompactionId()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 76af05395c..fdbf009352 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -572,11 +572,10 @@ public class CompactionCoordinator fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); } - // ELASTICITY_TODO DEFERRED - ISSUE 4044 return new TExternalCompactionJob(externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), fateId.getTid(), overrides); + TCompactionKind.valueOf(ecm.getKind().name()), fateId.toThrift(), overrides); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index e5ffc8039d..dce71c231d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -146,8 +146,7 @@ public class Utils { ZooReaderWriter zk = env.getContext().getZooReaderWriter(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 .. should the full FateId be passed below? - if (ZooReservation.attempt(zk, resvPath, fateId.getHexTid(), "")) { + if (ZooReservation.attempt(zk, resvPath, fateId, "")) { return 0; } else { return 50; @@ -158,13 +157,12 @@ public class Utils { throws KeeperException, InterruptedException { String resvPath = env.getContext().getZooKeeperRoot() + Constants.ZHDFS_RESERVATIONS + "/" + Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)); - ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId.getHexTid()); + ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId); } private static Lock getLock(ServerContext context, AbstractId<?> id, FateId fateId, boolean writeLock) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 ... should lock data use full FateId? - byte[] lockData = fateId.getHexTid().getBytes(UTF_8); + byte[] lockData = fateId.canonical().getBytes(UTF_8); var fLockPath = FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical()); FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index 33cf61a0cc..3b5b2fbbee 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -105,8 +105,7 @@ class BulkImportMove extends ManagerRepo { oldToNewMap.put(originalPath, newPath); } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId.getHexTid()); + fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java index 3871c3cdf5..95816691a6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java @@ -52,8 +52,7 @@ public class RefreshTablets extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit, tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index 57d3616482..a4f7928060 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -39,7 +39,7 @@ import java.util.function.Supplier; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -60,12 +60,12 @@ public class TabletRefresher { private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); public static void refresh(ServerContext context, - Supplier<Set<TServerInstance>> onlineTserversSupplier, long fateTxid, TableId tableId, + Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, TableId tableId, byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) { // ELASTICITY_TODO should this thread pool be configurable? - ThreadPoolExecutor threadPool = context.threadPools().createFixedThreadPool(10, - "Tablet refresh " + FateTxId.formatTid(fateTxid), false); + ThreadPoolExecutor threadPool = + context.threadPools().createFixedThreadPool(10, "Tablet refresh " + fateId, false); try (var tablets = context.getAmple().readTablets().forTable(tableId) .overlapping(startRow, endRow).checkConsistency() @@ -86,7 +86,7 @@ public class TabletRefresher { var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); - refreshTablets(threadPool, FateTxId.formatTid(fateTxid), context, onlineTserversSupplier, + refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, refreshesNeeded); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 9c2ca4c2de..3e5c17b650 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -295,9 +295,8 @@ class CompactionDriver extends ManagerRepo { // For any compactions that may have happened before this operation failed, attempt to refresh // tablets. - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId.getTid(), tableId, - startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId, tableId, startRow, + endRow, tabletMetadata -> true); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java index aca1242276..fd4daf0c4c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java @@ -45,9 +45,8 @@ public class RefreshTablets extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), - tableId, startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, tableId, + startRow, endRow, tabletMetadata -> true); return new CleanUp(tableId, namespaceId, startRow, endRow); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index 91d62752e7..25c2b7a58f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -54,8 +54,7 @@ public class ReserveTablets extends ManagerRepo { @Override public long isReady(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId); // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java index 7df3561e1a..f071785ece 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -85,8 +85,7 @@ public class DeleteRows extends ManagerRepo { // Only delete data within the original extent specified by the user KeyExtent range = data.getOriginalExtent(); log.debug("{} deleting tablet files in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); try ( var tabletsMetadata = manager.getContext().getAmple().readTablets() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index 53c24959ae..f48885d546 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -60,8 +60,7 @@ public class DeleteTablets extends ManagerRepo { KeyExtent range = data.getMergeExtent(); log.debug("{} Deleting tablets for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 643a3428a9..65cefef1ab 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -61,8 +61,7 @@ class FinishTableRangeOp extends ManagerRepo { static void removeOperationIds(Logger log, MergeInfo data, FateId fateId, Manager manager) { KeyExtent range = data.getReserveExtent(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); log.debug("{} unreserving tablet in range {}", fateId, range); AtomicLong acceptedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index b46f7bd2ef..2b5b9c969c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -78,8 +78,7 @@ public class MergeTablets extends ManagerRepo { KeyExtent range = data.getMergeExtent(); log.debug("{} Merging metadata for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); Set<TabletAvailability> tabletAvailabilities = new HashSet<>(); MetadataTime maxLogicalTime = null; List<ReferenceFile> dirs = new ArrayList<>(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index df8bd977ba..b3d246572a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -55,8 +55,7 @@ public class ReserveTablets extends ManagerRepo { public long isReady(FateId fateId, Manager env) throws Exception { var range = data.getReserveExtent(); log.debug("{} reserving tablets in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong opsAccepted = new AtomicLong(0); Consumer<Ample.ConditionalResult> resultConsumer = result -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index e3262daa8c..dbf0d04f42 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -42,8 +42,7 @@ public class DeleteOperationIds extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 813f525692..a973015ddc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -66,8 +66,7 @@ public class PreSplit extends ManagerRepo { // ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed, // revist later when more operations are moved out of tablet server - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); // ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to // the same tablet. @@ -132,8 +131,7 @@ public class PreSplit extends ManagerRepo { TabletMetadata tabletMetadata = manager.getContext().getAmple() .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) { // the tablet no longer exists or we could not set the operation id, maybe another operation diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 27c5c39709..afc1a77b7a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -61,8 +61,7 @@ public class UpdateTablets extends ManagerRepo { TabletMetadata tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal()); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null) { // check to see if this operation has already succeeded. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index 0bc2e86983..f0ffe01994 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@ -106,8 +106,7 @@ class MoveExportedFiles extends ManagerRepo { } } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId.getHexTid()); + fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 64cc5d03ff..404e04fccc 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -166,7 +166,7 @@ public class CompactionCoordinatorTest { protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, ExternalCompactionId externalCompactionId) { - FateInstanceType type = FateInstanceType.fromTableId(tablet.getTableId()); + FateInstanceType type = FateInstanceType.fromTableId(tablet.getExtent().tableId()); FateId fateId = FateId.from(type, 1L); return new CompactionMetadata(jobFiles, new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), @@ -180,7 +180,12 @@ public class CompactionCoordinatorTest { metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), SystemIteratorUtil.toIteratorConfig(List.of()), ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), 1L, Map.of()); + TCompactionKind.valueOf(ecm.getKind().name()), + FateId + .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()), + 1L) + .toThrift(), + Map.of()); } @Override @@ -273,7 +278,6 @@ public class CompactionCoordinatorTest { TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); - expect(tm.getTableId()).andReturn(ke.tableId()); EasyMock.replay(tconf, context, creds, tm, security); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 7076bf44ce..3ecb58bd6c 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -57,6 +57,8 @@ import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -267,7 +269,9 @@ public class ScanServerIT extends SharedMiniClusterBase { 1_000); // Set operationIds on all the table's tablets so that they won't be loaded. - TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, 1234L); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId = FateId.from(type, 1234L); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Ample ample = getCluster().getServerContext().getAmple(); ServerAmpleImpl sai = (ServerAmpleImpl) ample; try (TabletsMutator tm = sai.mutateTablets()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index a193357597..2b3a8972f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -700,8 +700,11 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var context = cluster.getServerContext(); - var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]"); - var opid2 = TabletOperationId.from("MERGING:FATE[5678]"); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId1 = FateId.from(type, "1234"); + FateId fateId2 = FateId.from(type, "5678"); + var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false); @@ -832,7 +835,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(LocationType.CURRENT, loc.getType()); assertNull(rootMeta.getOperationId()); - TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, 7); + FateInstanceType type = FateInstanceType.fromTableId(RootTable.EXTENT.tableId()); + FateId fateId = FateId.from(type, 7); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation() @@ -1201,7 +1206,9 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { }; // run a test where a subset of tablets are modified, all modifications should be accepted - var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50); + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId1 = FateId.from(type, 50); + var opid1 = TabletOperationId.from(TabletOperationType.MERGING, fateId1); int expected = 0; try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); @@ -1222,7 +1229,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { // run test where some will be accepted and some will be rejected and ensure the counts come // out as expected. - var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51); + FateId fateId2 = FateId.from(type, 51); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); accepted.set(0); total.set(0); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 1d9096f819..2e61925e65 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -62,6 +62,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -382,6 +384,10 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { String tableName = super.getUniqueNames(1)[0]; var tableId = TableId.of(prepTableForScanTest(c, tableName)); + + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId = FateId.from(type, 42L); + assertEquals(0, countTabletsWithLocation(c, tableId)); assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream() @@ -394,7 +400,7 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // to not be assigned try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); @@ -420,7 +426,7 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // to be unhosted try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index ff6f4d34cb..c88989d1c4 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -56,6 +56,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.state.TabletManagement; @@ -322,7 +324,9 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { // Sets an operation type on all tablets up to the end row private void setOperationId(AccumuloClient client, String table, String tableNameToModify, Text end, TabletOperationType opType) throws TableNotFoundException { - var opid = TabletOperationId.from(opType, 42L); + FateInstanceType type = FateInstanceType.fromNamespaceOrTableName(table); + FateId fateId = FateId.from(type, 42L); + var opid = TabletOperationId.from(opType, fateId); TableId tableIdToModify = TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); try (TabletsMetadata tabletsMetadata =