This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 569c150 re #1452: compactor wip 569c150 is described below commit 569c150ce9993dc0ce069181c4f17ce35d71873b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 2 22:23:23 2021 +0000 re #1452: compactor wip --- .../java/org/apache/accumulo/core/Constants.java | 4 +- .../compaction/thrift/CompactionCoordinator.java | 36 +- .../core/compaction/thrift/CompactionJob.java | 274 +++++++- .../core/compaction/thrift/CompactionKind.java | 66 ++ .../accumulo/core/compaction/thrift/InputFile.java | 687 +++++++++++++++++++++ .../org/apache/accumulo/core/conf/Property.java | 25 +- core/src/main/thrift/compaction-coordinator.thrift | 33 +- pom.xml | 1 + .../apache/accumulo/server/fs}/FileManager.java | 2 +- .../accumulo/server/fs}/TooManyFilesException.java | 2 +- .../iterators}/TabletIteratorEnvironment.java | 5 +- .../accumulo/server/manager/LiveTServerSet.java | 2 +- server/compactor/.gitignore | 28 + server/compactor/pom.xml | 38 ++ .../org/apache/accumulo/compactor/Compactor.java | 616 ++++++++++++++++++ .../accumulo/compactor/RetryableThriftCall.java | 92 +++ .../compactor/RetryableThriftFunction.java} | 14 +- .../accumulo/tserver/ConditionCheckerContext.java | 1 + .../tserver/TabletServerResourceManager.java | 3 +- .../accumulo/tserver/ThriftClientHandler.java | 1 + .../accumulo/tserver/scan/NextBatchTask.java | 2 +- .../accumulo/tserver/tablet/CompactableUtils.java | 2 +- .../accumulo/tserver/tablet/MinorCompactor.java | 2 +- .../accumulo/tserver/tablet/ScanDataSource.java | 4 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- 25 files changed, 1872 insertions(+), 70 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 70353f2..3276eb2 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -59,9 +59,11 @@ public class Constants { public static final String ZTSERVERS = "/tservers"; + public static final String ZCOMPACTORS = "/compactors"; + public static final String ZCOORDINATOR = "/coordinators"; public static final String ZCOORDINATOR_LOCK = "/coordinators/lock"; - + public static final String ZDEAD = "/dead"; public static final String ZDEADTSERVERS = ZDEAD + "/tservers"; diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java index 4f4fb31..ffd00f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java +++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java @@ -2594,14 +2594,14 @@ public class CompactionCoordinator { case 0: // SUCCESS if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list0 = iprot.readListBegin(); - struct.success = new java.util.ArrayList<Status>(_list0.size); - @org.apache.thrift.annotation.Nullable Status _elem1; - for (int _i2 = 0; _i2 < _list0.size; ++_i2) + org.apache.thrift.protocol.TList _list8 = iprot.readListBegin(); + struct.success = new java.util.ArrayList<Status>(_list8.size); + @org.apache.thrift.annotation.Nullable Status _elem9; + for (int _i10 = 0; _i10 < _list8.size; ++_i10) { - _elem1 = new Status(); - _elem1.read(iprot); - struct.success.add(_elem1); + _elem9 = new Status(); + _elem9.read(iprot); + struct.success.add(_elem9); } iprot.readListEnd(); } @@ -2629,9 +2629,9 @@ public class CompactionCoordinator { oprot.writeFieldBegin(SUCCESS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.success.size())); - for (Status _iter3 : struct.success) + for (Status _iter11 : struct.success) { - _iter3.write(oprot); + _iter11.write(oprot); } oprot.writeListEnd(); } @@ -2662,9 +2662,9 @@ public class CompactionCoordinator { if (struct.isSetSuccess()) { { oprot.writeI32(struct.success.size()); - for (Status _iter4 : struct.success) + for (Status _iter12 : struct.success) { - _iter4.write(oprot); + _iter12.write(oprot); } } } @@ -2676,14 +2676,14 @@ public class CompactionCoordinator { java.util.BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { { - org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.success = new java.util.ArrayList<Status>(_list5.size); - @org.apache.thrift.annotation.Nullable Status _elem6; - for (int _i7 = 0; _i7 < _list5.size; ++_i7) + org.apache.thrift.protocol.TList _list13 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.success = new java.util.ArrayList<Status>(_list13.size); + @org.apache.thrift.annotation.Nullable Status _elem14; + for (int _i15 = 0; _i15 < _list13.size; ++_i15) { - _elem6 = new Status(); - _elem6.read(iprot); - struct.success.add(_elem6); + _elem14 = new Status(); + _elem14.read(iprot); + struct.success.add(_elem14); } } struct.setSuccessIsSet(true); diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java index 66fa0ff..ece8893 100644 --- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java +++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java @@ -40,6 +40,8 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)11); private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.I32, (short)12); private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)13); + private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)14); + private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)15); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionJobStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionJobTupleSchemeFactory(); @@ -48,7 +50,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // required public long compactionId; // required public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; // required - public @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files; // required + public @org.apache.thrift.annotation.Nullable java.util.List<InputFile> files; // required public int priority; // required public int readRate; // required public int writeRate; // required @@ -64,6 +66,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com */ public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.tabletserver.thrift.CompactionReason reason; // required public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required + public boolean propagateDeletes; // required + /** + * + * @see CompactionKind + */ + public @org.apache.thrift.annotation.Nullable CompactionKind kind; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -86,7 +94,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com * @see org.apache.accumulo.core.tabletserver.thrift.CompactionReason */ REASON((short)12, "reason"), - OUTPUT_FILE((short)13, "outputFile"); + OUTPUT_FILE((short)13, "outputFile"), + PROPAGATE_DELETES((short)14, "propagateDeletes"), + /** + * + * @see CompactionKind + */ + KIND((short)15, "kind"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -126,6 +140,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com return REASON; case 13: // OUTPUT_FILE return OUTPUT_FILE; + case 14: // PROPAGATE_DELETES + return PROPAGATE_DELETES; + case 15: // KIND + return KIND; default: return null; } @@ -171,6 +189,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com private static final int __PRIORITY_ISSET_ID = 1; private static final int __READRATE_ISSET_ID = 2; private static final int __WRITERATE_ISSET_ID = 3; + private static final int __PROPAGATEDELETES_ISSET_ID = 4; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -185,7 +204,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class))); tmpMap.put(_Fields.FILES, new org.apache.thrift.meta_data.FieldMetaData("files", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, InputFile.class)))); tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.READ_RATE, new org.apache.thrift.meta_data.FieldMetaData("readRate", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -200,6 +219,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, org.apache.accumulo.core.tabletserver.thrift.CompactionReason.class))); tmpMap.put(_Fields.OUTPUT_FILE, new org.apache.thrift.meta_data.FieldMetaData("outputFile", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PROPAGATE_DELETES, new org.apache.thrift.meta_data.FieldMetaData("propagateDeletes", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CompactionKind.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionJob.class, metaDataMap); } @@ -212,14 +235,16 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, long compactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, - java.util.List<java.lang.String> files, + java.util.List<InputFile> files, int priority, int readRate, int writeRate, org.apache.accumulo.core.tabletserver.thrift.IteratorConfig iteratorSettings, org.apache.accumulo.core.tabletserver.thrift.CompactionType type, org.apache.accumulo.core.tabletserver.thrift.CompactionReason reason, - java.lang.String outputFile) + java.lang.String outputFile, + boolean propagateDeletes, + CompactionKind kind) { this(); this.traceInfo = traceInfo; @@ -238,6 +263,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com this.type = type; this.reason = reason; this.outputFile = outputFile; + this.propagateDeletes = propagateDeletes; + setPropagateDeletesIsSet(true); + this.kind = kind; } /** @@ -256,7 +284,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com this.extent = new org.apache.accumulo.core.dataImpl.thrift.TKeyExtent(other.extent); } if (other.isSetFiles()) { - java.util.List<java.lang.String> __this__files = new java.util.ArrayList<java.lang.String>(other.files); + java.util.List<InputFile> __this__files = new java.util.ArrayList<InputFile>(other.files.size()); + for (InputFile other_element : other.files) { + __this__files.add(new InputFile(other_element)); + } this.files = __this__files; } this.priority = other.priority; @@ -274,6 +305,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (other.isSetOutputFile()) { this.outputFile = other.outputFile; } + this.propagateDeletes = other.propagateDeletes; + if (other.isSetKind()) { + this.kind = other.kind; + } } public CompactionJob deepCopy() { @@ -298,6 +333,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com this.type = null; this.reason = null; this.outputFile = null; + setPropagateDeletesIsSet(false); + this.propagateDeletes = false; + this.kind = null; } @org.apache.thrift.annotation.Nullable @@ -403,23 +441,23 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com } @org.apache.thrift.annotation.Nullable - public java.util.Iterator<java.lang.String> getFilesIterator() { + public java.util.Iterator<InputFile> getFilesIterator() { return (this.files == null) ? null : this.files.iterator(); } - public void addToFiles(java.lang.String elem) { + public void addToFiles(InputFile elem) { if (this.files == null) { - this.files = new java.util.ArrayList<java.lang.String>(); + this.files = new java.util.ArrayList<InputFile>(); } this.files.add(elem); } @org.apache.thrift.annotation.Nullable - public java.util.List<java.lang.String> getFiles() { + public java.util.List<InputFile> getFiles() { return this.files; } - public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files) { + public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<InputFile> files) { this.files = files; return this; } @@ -624,6 +662,62 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com } } + public boolean isPropagateDeletes() { + return this.propagateDeletes; + } + + public CompactionJob setPropagateDeletes(boolean propagateDeletes) { + this.propagateDeletes = propagateDeletes; + setPropagateDeletesIsSet(true); + return this; + } + + public void unsetPropagateDeletes() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID); + } + + /** Returns true if field propagateDeletes is set (has been assigned a value) and false otherwise */ + public boolean isSetPropagateDeletes() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID); + } + + public void setPropagateDeletesIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID, value); + } + + /** + * + * @see CompactionKind + */ + @org.apache.thrift.annotation.Nullable + public CompactionKind getKind() { + return this.kind; + } + + /** + * + * @see CompactionKind + */ + public CompactionJob setKind(@org.apache.thrift.annotation.Nullable CompactionKind kind) { + this.kind = kind; + return this; + } + + public void unsetKind() { + this.kind = null; + } + + /** Returns true if field kind is set (has been assigned a value) and false otherwise */ + public boolean isSetKind() { + return this.kind != null; + } + + public void setKindIsSet(boolean value) { + if (!value) { + this.kind = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case TRACE_INFO: @@ -662,7 +756,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (value == null) { unsetFiles(); } else { - setFiles((java.util.List<java.lang.String>)value); + setFiles((java.util.List<InputFile>)value); } break; @@ -722,6 +816,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com } break; + case PROPAGATE_DELETES: + if (value == null) { + unsetPropagateDeletes(); + } else { + setPropagateDeletes((java.lang.Boolean)value); + } + break; + + case KIND: + if (value == null) { + unsetKind(); + } else { + setKind((CompactionKind)value); + } + break; + } } @@ -764,6 +874,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com case OUTPUT_FILE: return getOutputFile(); + case PROPAGATE_DELETES: + return isPropagateDeletes(); + + case KIND: + return getKind(); + } throw new java.lang.IllegalStateException(); } @@ -799,6 +915,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com return isSetReason(); case OUTPUT_FILE: return isSetOutputFile(); + case PROPAGATE_DELETES: + return isSetPropagateDeletes(); + case KIND: + return isSetKind(); } throw new java.lang.IllegalStateException(); } @@ -926,6 +1046,24 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com return false; } + boolean this_present_propagateDeletes = true; + boolean that_present_propagateDeletes = true; + if (this_present_propagateDeletes || that_present_propagateDeletes) { + if (!(this_present_propagateDeletes && that_present_propagateDeletes)) + return false; + if (this.propagateDeletes != that.propagateDeletes) + return false; + } + + boolean this_present_kind = true && this.isSetKind(); + boolean that_present_kind = true && that.isSetKind(); + if (this_present_kind || that_present_kind) { + if (!(this_present_kind && that_present_kind)) + return false; + if (!this.kind.equals(that.kind)) + return false; + } + return true; } @@ -973,6 +1111,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (isSetOutputFile()) hashCode = hashCode * 8191 + outputFile.hashCode(); + hashCode = hashCode * 8191 + ((propagateDeletes) ? 131071 : 524287); + + hashCode = hashCode * 8191 + ((isSetKind()) ? 131071 : 524287); + if (isSetKind()) + hashCode = hashCode * 8191 + kind.getValue(); + return hashCode; } @@ -1104,6 +1248,26 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(isSetPropagateDeletes()).compareTo(other.isSetPropagateDeletes()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPropagateDeletes()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.propagateDeletes, other.propagateDeletes); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetKind()).compareTo(other.isSetKind()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetKind()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.kind, other.kind); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1204,6 +1368,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com sb.append(this.outputFile); } first = false; + if (!first) sb.append(", "); + sb.append("propagateDeletes:"); + sb.append(this.propagateDeletes); + first = false; + if (!first) sb.append(", "); + sb.append("kind:"); + if (this.kind == null) { + sb.append("null"); + } else { + sb.append(this.kind); + } + first = false; sb.append(")"); return sb.toString(); } @@ -1300,11 +1476,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { org.apache.thrift.protocol.TList _list0 = iprot.readListBegin(); - struct.files = new java.util.ArrayList<java.lang.String>(_list0.size); - @org.apache.thrift.annotation.Nullable java.lang.String _elem1; + struct.files = new java.util.ArrayList<InputFile>(_list0.size); + @org.apache.thrift.annotation.Nullable InputFile _elem1; for (int _i2 = 0; _i2 < _list0.size; ++_i2) { - _elem1 = iprot.readString(); + _elem1 = new InputFile(); + _elem1.read(iprot); struct.files.add(_elem1); } iprot.readListEnd(); @@ -1371,6 +1548,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 14: // PROPAGATE_DELETES + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.propagateDeletes = iprot.readBool(); + struct.setPropagateDeletesIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 15: // KIND + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.kind = org.apache.accumulo.core.compaction.thrift.CompactionKind.findByValue(iprot.readI32()); + struct.setKindIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1407,10 +1600,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (struct.files != null) { oprot.writeFieldBegin(FILES_FIELD_DESC); { - oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.files.size())); - for (java.lang.String _iter3 : struct.files) + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.files.size())); + for (InputFile _iter3 : struct.files) { - oprot.writeString(_iter3); + _iter3.write(oprot); } oprot.writeListEnd(); } @@ -1445,6 +1638,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com oprot.writeString(struct.outputFile); oprot.writeFieldEnd(); } + oprot.writeFieldBegin(PROPAGATE_DELETES_FIELD_DESC); + oprot.writeBool(struct.propagateDeletes); + oprot.writeFieldEnd(); + if (struct.kind != null) { + oprot.writeFieldBegin(KIND_FIELD_DESC); + oprot.writeI32(struct.kind.getValue()); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1499,7 +1700,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (struct.isSetOutputFile()) { optionals.set(11); } - oprot.writeBitSet(optionals, 12); + if (struct.isSetPropagateDeletes()) { + optionals.set(12); + } + if (struct.isSetKind()) { + optionals.set(13); + } + oprot.writeBitSet(optionals, 14); if (struct.isSetTraceInfo()) { struct.traceInfo.write(oprot); } @@ -1515,9 +1722,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (struct.isSetFiles()) { { oprot.writeI32(struct.files.size()); - for (java.lang.String _iter4 : struct.files) + for (InputFile _iter4 : struct.files) { - oprot.writeString(_iter4); + _iter4.write(oprot); } } } @@ -1542,12 +1749,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com if (struct.isSetOutputFile()) { oprot.writeString(struct.outputFile); } + if (struct.isSetPropagateDeletes()) { + oprot.writeBool(struct.propagateDeletes); + } + if (struct.isSetKind()) { + oprot.writeI32(struct.kind.getValue()); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, CompactionJob struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(12); + java.util.BitSet incoming = iprot.readBitSet(14); if (incoming.get(0)) { struct.traceInfo = new org.apache.accumulo.core.trace.thrift.TInfo(); struct.traceInfo.read(iprot); @@ -1569,12 +1782,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com } if (incoming.get(4)) { { - org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.files = new java.util.ArrayList<java.lang.String>(_list5.size); - @org.apache.thrift.annotation.Nullable java.lang.String _elem6; + org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.files = new java.util.ArrayList<InputFile>(_list5.size); + @org.apache.thrift.annotation.Nullable InputFile _elem6; for (int _i7 = 0; _i7 < _list5.size; ++_i7) { - _elem6 = iprot.readString(); + _elem6 = new InputFile(); + _elem6.read(iprot); struct.files.add(_elem6); } } @@ -1609,6 +1823,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com struct.outputFile = iprot.readString(); struct.setOutputFileIsSet(true); } + if (incoming.get(12)) { + struct.propagateDeletes = iprot.readBool(); + struct.setPropagateDeletesIsSet(true); + } + if (incoming.get(13)) { + struct.kind = org.apache.accumulo.core.compaction.thrift.CompactionKind.findByValue(iprot.readI32()); + struct.setKindIsSet(true); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java new file mode 100644 index 0000000..4d05032 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.accumulo.core.compaction.thrift; + + +public enum CompactionKind implements org.apache.thrift.TEnum { + CHOP(0), + SELECTOR(1), + SYSTEM(2), + USER(3); + + private final int value; + + private CompactionKind(int value) { + this.value = value; + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } + + /** + * Find a the enum type by its integer value, as defined in the Thrift IDL. + * @return null if the value is not found. + */ + @org.apache.thrift.annotation.Nullable + public static CompactionKind findByValue(int value) { + switch (value) { + case 0: + return CHOP; + case 1: + return SELECTOR; + case 2: + return SYSTEM; + case 3: + return USER; + default: + return null; + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java new file mode 100644 index 0000000..d191a8b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.accumulo.core.compaction.thrift; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +public class InputFile implements org.apache.thrift.TBase<InputFile, InputFile._Fields>, java.io.Serializable, Cloneable, Comparable<InputFile> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InputFile"); + + private static final org.apache.thrift.protocol.TField METADATA_FILE_ENTRY_FIELD_DESC = new org.apache.thrift.protocol.TField("metadataFileEntry", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("size", org.apache.thrift.protocol.TType.I64, (short)2); + private static final org.apache.thrift.protocol.TField ENTRIES_FIELD_DESC = new org.apache.thrift.protocol.TField("entries", org.apache.thrift.protocol.TType.I64, (short)3); + private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)4); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InputFileStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InputFileTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry; // required + public long size; // required + public long entries; // required + public long timestamp; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + METADATA_FILE_ENTRY((short)1, "metadataFileEntry"), + SIZE((short)2, "size"), + ENTRIES((short)3, "entries"), + TIMESTAMP((short)4, "timestamp"); + + private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // METADATA_FILE_ENTRY + return METADATA_FILE_ENTRY; + case 2: // SIZE + return SIZE; + case 3: // ENTRIES + return ENTRIES; + case 4: // TIMESTAMP + return TIMESTAMP; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __SIZE_ISSET_ID = 0; + private static final int __ENTRIES_ISSET_ID = 1; + private static final int __TIMESTAMP_ISSET_ID = 2; + private byte __isset_bitfield = 0; + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.METADATA_FILE_ENTRY, new org.apache.thrift.meta_data.FieldMetaData("metadataFileEntry", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.SIZE, new org.apache.thrift.meta_data.FieldMetaData("size", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.ENTRIES, new org.apache.thrift.meta_data.FieldMetaData("entries", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InputFile.class, metaDataMap); + } + + public InputFile() { + } + + public InputFile( + java.lang.String metadataFileEntry, + long size, + long entries, + long timestamp) + { + this(); + this.metadataFileEntry = metadataFileEntry; + this.size = size; + setSizeIsSet(true); + this.entries = entries; + setEntriesIsSet(true); + this.timestamp = timestamp; + setTimestampIsSet(true); + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public InputFile(InputFile other) { + __isset_bitfield = other.__isset_bitfield; + if (other.isSetMetadataFileEntry()) { + this.metadataFileEntry = other.metadataFileEntry; + } + this.size = other.size; + this.entries = other.entries; + this.timestamp = other.timestamp; + } + + public InputFile deepCopy() { + return new InputFile(this); + } + + @Override + public void clear() { + this.metadataFileEntry = null; + setSizeIsSet(false); + this.size = 0; + setEntriesIsSet(false); + this.entries = 0; + setTimestampIsSet(false); + this.timestamp = 0; + } + + @org.apache.thrift.annotation.Nullable + public java.lang.String getMetadataFileEntry() { + return this.metadataFileEntry; + } + + public InputFile setMetadataFileEntry(@org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry) { + this.metadataFileEntry = metadataFileEntry; + return this; + } + + public void unsetMetadataFileEntry() { + this.metadataFileEntry = null; + } + + /** Returns true if field metadataFileEntry is set (has been assigned a value) and false otherwise */ + public boolean isSetMetadataFileEntry() { + return this.metadataFileEntry != null; + } + + public void setMetadataFileEntryIsSet(boolean value) { + if (!value) { + this.metadataFileEntry = null; + } + } + + public long getSize() { + return this.size; + } + + public InputFile setSize(long size) { + this.size = size; + setSizeIsSet(true); + return this; + } + + public void unsetSize() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SIZE_ISSET_ID); + } + + /** Returns true if field size is set (has been assigned a value) and false otherwise */ + public boolean isSetSize() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SIZE_ISSET_ID); + } + + public void setSizeIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SIZE_ISSET_ID, value); + } + + public long getEntries() { + return this.entries; + } + + public InputFile setEntries(long entries) { + this.entries = entries; + setEntriesIsSet(true); + return this; + } + + public void unsetEntries() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __ENTRIES_ISSET_ID); + } + + /** Returns true if field entries is set (has been assigned a value) and false otherwise */ + public boolean isSetEntries() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __ENTRIES_ISSET_ID); + } + + public void setEntriesIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIES_ISSET_ID, value); + } + + public long getTimestamp() { + return this.timestamp; + } + + public InputFile setTimestamp(long timestamp) { + this.timestamp = timestamp; + setTimestampIsSet(true); + return this; + } + + public void unsetTimestamp() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TIMESTAMP_ISSET_ID); + } + + /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */ + public boolean isSetTimestamp() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TIMESTAMP_ISSET_ID); + } + + public void setTimestampIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TIMESTAMP_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + case METADATA_FILE_ENTRY: + if (value == null) { + unsetMetadataFileEntry(); + } else { + setMetadataFileEntry((java.lang.String)value); + } + break; + + case SIZE: + if (value == null) { + unsetSize(); + } else { + setSize((java.lang.Long)value); + } + break; + + case ENTRIES: + if (value == null) { + unsetEntries(); + } else { + setEntries((java.lang.Long)value); + } + break; + + case TIMESTAMP: + if (value == null) { + unsetTimestamp(); + } else { + setTimestamp((java.lang.Long)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case METADATA_FILE_ENTRY: + return getMetadataFileEntry(); + + case SIZE: + return getSize(); + + case ENTRIES: + return getEntries(); + + case TIMESTAMP: + return getTimestamp(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case METADATA_FILE_ENTRY: + return isSetMetadataFileEntry(); + case SIZE: + return isSetSize(); + case ENTRIES: + return isSetEntries(); + case TIMESTAMP: + return isSetTimestamp(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof InputFile) + return this.equals((InputFile)that); + return false; + } + + public boolean equals(InputFile that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_metadataFileEntry = true && this.isSetMetadataFileEntry(); + boolean that_present_metadataFileEntry = true && that.isSetMetadataFileEntry(); + if (this_present_metadataFileEntry || that_present_metadataFileEntry) { + if (!(this_present_metadataFileEntry && that_present_metadataFileEntry)) + return false; + if (!this.metadataFileEntry.equals(that.metadataFileEntry)) + return false; + } + + boolean this_present_size = true; + boolean that_present_size = true; + if (this_present_size || that_present_size) { + if (!(this_present_size && that_present_size)) + return false; + if (this.size != that.size) + return false; + } + + boolean this_present_entries = true; + boolean that_present_entries = true; + if (this_present_entries || that_present_entries) { + if (!(this_present_entries && that_present_entries)) + return false; + if (this.entries != that.entries) + return false; + } + + boolean this_present_timestamp = true; + boolean that_present_timestamp = true; + if (this_present_timestamp || that_present_timestamp) { + if (!(this_present_timestamp && that_present_timestamp)) + return false; + if (this.timestamp != that.timestamp) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetMetadataFileEntry()) ? 131071 : 524287); + if (isSetMetadataFileEntry()) + hashCode = hashCode * 8191 + metadataFileEntry.hashCode(); + + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(size); + + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entries); + + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(timestamp); + + return hashCode; + } + + @Override + public int compareTo(InputFile other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetMetadataFileEntry()).compareTo(other.isSetMetadataFileEntry()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMetadataFileEntry()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.metadataFileEntry, other.metadataFileEntry); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetSize()).compareTo(other.isSetSize()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSize()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.size, other.size); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetEntries()).compareTo(other.isSetEntries()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetEntries()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.entries, other.entries); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetTimestamp()).compareTo(other.isSetTimestamp()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTimestamp()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, other.timestamp); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("InputFile("); + boolean first = true; + + sb.append("metadataFileEntry:"); + if (this.metadataFileEntry == null) { + sb.append("null"); + } else { + sb.append(this.metadataFileEntry); + } + first = false; + if (!first) sb.append(", "); + sb.append("size:"); + sb.append(this.size); + first = false; + if (!first) sb.append(", "); + sb.append("entries:"); + sb.append(this.entries); + first = false; + if (!first) sb.append(", "); + sb.append("timestamp:"); + sb.append(this.timestamp); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class InputFileStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public InputFileStandardScheme getScheme() { + return new InputFileStandardScheme(); + } + } + + private static class InputFileStandardScheme extends org.apache.thrift.scheme.StandardScheme<InputFile> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, InputFile struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // METADATA_FILE_ENTRY + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.metadataFileEntry = iprot.readString(); + struct.setMetadataFileEntryIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // SIZE + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.size = iprot.readI64(); + struct.setSizeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // ENTRIES + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.entries = iprot.readI64(); + struct.setEntriesIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 4: // TIMESTAMP + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.timestamp = iprot.readI64(); + struct.setTimestampIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, InputFile struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.metadataFileEntry != null) { + oprot.writeFieldBegin(METADATA_FILE_ENTRY_FIELD_DESC); + oprot.writeString(struct.metadataFileEntry); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(SIZE_FIELD_DESC); + oprot.writeI64(struct.size); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(ENTRIES_FIELD_DESC); + oprot.writeI64(struct.entries); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC); + oprot.writeI64(struct.timestamp); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class InputFileTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public InputFileTupleScheme getScheme() { + return new InputFileTupleScheme(); + } + } + + private static class InputFileTupleScheme extends org.apache.thrift.scheme.TupleScheme<InputFile> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetMetadataFileEntry()) { + optionals.set(0); + } + if (struct.isSetSize()) { + optionals.set(1); + } + if (struct.isSetEntries()) { + optionals.set(2); + } + if (struct.isSetTimestamp()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); + if (struct.isSetMetadataFileEntry()) { + oprot.writeString(struct.metadataFileEntry); + } + if (struct.isSetSize()) { + oprot.writeI64(struct.size); + } + if (struct.isSetEntries()) { + oprot.writeI64(struct.entries); + } + if (struct.isSetTimestamp()) { + oprot.writeI64(struct.timestamp); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(4); + if (incoming.get(0)) { + struct.metadataFileEntry = iprot.readString(); + struct.setMetadataFileEntryIsSet(true); + } + if (incoming.get(1)) { + struct.size = iprot.readI64(); + struct.setSizeIsSet(true); + } + if (incoming.get(2)) { + struct.entries = iprot.readI64(); + struct.setEntriesIsSet(true); + } + if (incoming.get(3)) { + struct.timestamp = iprot.readI64(); + struct.setTimestampIsSet(true); + } + } + } + + private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + private static void unusedMethod() {} +} + diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f05794f..48b983b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1024,6 +1024,21 @@ public enum Property { REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION, "Amount of time for a single replication RPC call to last before failing" + " the attempt. See replication.work.attempts."), + // Compactor properties + COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX, + "Properties in this category affect the behavior of the accumulo compactor server."), + COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN, + "if the ports above are in use, search higher ports until one is available"), + COMPACTOR_CLIENTPORT("compactor.port.client", "9100", PropertyType.PORT, + "The port used for handling client connections on the compactor servers"), + COMPACTOR_MINTHREADS("compactor.server.threads.minimum", "1", PropertyType.COUNT, + "The minimum number of threads to use to handle incoming requests."), + COMPACTOR_MINTHREADS_TIMEOUT("compactor.server.threads.timeout", "0s", PropertyType.TIMEDURATION, + "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."), + COMPACTOR_THREADCHECK("compactor.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, + "The time between adjustments of the server thread pool."), + COMPACTOR_MAX_MESSAGE_SIZE("compactor.server.message.size.max", "10M", PropertyType.BYTES, + "The maximum size of a message that can be sent to a tablet server."), // CompactionCoordinator properties COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compaction coordinator server."), @@ -1033,12 +1048,18 @@ public enum Property { "The port used for handling client connections on the compactor servers"), COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), - COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", PropertyType.TIMEDURATION, + COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", + PropertyType.TIMEDURATION, "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."), COORDINATOR_THREADCHECK("coordinator.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), COORDINATOR_MAX_MESSAGE_SIZE("coordinator.server.message.size.max", "10M", PropertyType.BYTES, - "The maximum size of a message that can be sent to a tablet server."), // deprecated properties grouped at the end to reference property that replaces them + "The maximum size of a message that can be sent to a tablet server."), // deprecated + // properties grouped + // at the end to + // reference property + // that replaces them + // deprecated properties grouped at the end to reference property that replaces them @Deprecated(since = "1.6.0") @ReplacedBy(property = INSTANCE_VOLUMES) INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI, diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift index cc702bc..7cc2caf 100644 --- a/core/src/main/thrift/compaction-coordinator.thrift +++ b/core/src/main/thrift/compaction-coordinator.thrift @@ -39,6 +39,37 @@ enum CompactionState { CANCELLED } +struct InputFile { + 1:string metadataFileEntry + 2:i64 size + 3:i64 entries + 4:i64 timestamp +} + +enum CompactionKind { + CHOP + SELECTOR + SYSTEM + USER +} + +struct CompactionJob { + 1:trace.TInfo traceInfo + 2:security.TCredentials credentials + 3:i64 compactionId + 5:data.TKeyExtent extent + 6:list<InputFile> files + 7:i32 priority + 8:i32 readRate + 9:i32 writeRate + 10:tabletserver.IteratorConfig iteratorSettings + 11:tabletserver.CompactionType type + 12:tabletserver.CompactionReason reason + 13:string outputFile + 14:bool propagateDeletes + 15:CompactionKind kind +} + struct Status { 1:i64 timestamp 2:i64 compactionId @@ -104,4 +135,4 @@ service Compactor { 1:tabletserver.CompactionJob compaction ) -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index d2bd521..1d74ca2 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ <module>iterator-test-harness</module> <module>minicluster</module> <module>server/base</module> + <module>server/compactor</module> <module>server/compaction-coordinator</module> <module>server/gc</module> <module>server/manager</module> diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java similarity index 99% rename from server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java rename to server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index e0d70fc..1335e44 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.tserver; +package org.apache.accumulo.server.fs; import java.io.IOException; import java.util.ArrayList; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java similarity index 96% copy from server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java copy to server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java index 243d36c..7adae11 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.tserver; +package org.apache.accumulo.server.fs; import java.io.IOException; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java similarity index 97% rename from server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java rename to server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java index 9d4467d..6311200 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.tserver; +package org.apache.accumulo.server.iterators; import java.io.IOException; import java.util.ArrayList; @@ -41,8 +41,7 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; -import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; -import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.server.fs.FileManager.ScanFileManager; import org.apache.hadoop.fs.Path; public class TabletIteratorEnvironment implements SystemIteratorEnvironment { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index b318835..4f8e894 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -77,7 +77,7 @@ public class LiveTServerSet implements Watcher { public TServerConnection(HostAndPort addr) { address = addr; } - + public HostAndPort getAddress() { return address; } diff --git a/server/compactor/.gitignore b/server/compactor/.gitignore new file mode 100644 index 0000000..e77a822 --- /dev/null +++ b/server/compactor/.gitignore @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Maven ignores +/target/ + +# IDE ignores +/.settings/ +/.project +/.classpath +/.pydevproject +/.idea +/*.iml +/nbproject/ +/nbactions.xml +/nb-configuration.xml diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml new file mode 100644 index 0000000..1ed866f --- /dev/null +++ b/server/compactor/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-project</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>accumulo-compactor</artifactId> + <name>Apache Accumulo Compactor</name> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-server-base</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java new file mode 100644 index 0000000..75d81cf --- /dev/null +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator; +import org.apache.accumulo.core.compaction.thrift.CompactionJob; +import org.apache.accumulo.core.compaction.thrift.CompactionState; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.compaction.CompactionInfo; +import org.apache.accumulo.server.compaction.CompactionStats; +import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +public class Compactor extends AbstractServer + implements org.apache.accumulo.core.compaction.thrift.Compactor.Iface { + + private static class CompactorServerOpts extends ServerOpts { + @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name") + private String queueName = null; + + public String getQueueName() { + return queueName; + } + } + + /** + * Object used to hold information about the current compaction + */ + private static class CompactionJobHolder { + private CompactionJob job; + private Thread compactionThread; + private volatile Boolean cancelled = Boolean.FALSE; + private CompactionStats stats = null; + + public CompactionJobHolder() {} + + public void reset() { + job = null; + compactionThread = null; + cancelled = Boolean.FALSE; + stats = null; + } + + public CompactionJob getJob() { + return job; + } + + public Thread getThread() { + return compactionThread; + } + + public CompactionStats getStats() { + return stats; + } + + public void setStats(CompactionStats stats) { + this.stats = stats; + } + + public void cancel() { + cancelled = Boolean.TRUE; + } + + public boolean isCancelled() { + return cancelled; + } + + public boolean isSet() { + return (null != this.job); + } + + public void set(CompactionJob job, Thread compactionThread) { + Objects.requireNonNull(job, "CompactionJob is null"); + Objects.requireNonNull(compactionThread, "Compaction thread is null"); + this.job = job; + this.compactionThread = compactionThread; + } + + } + + /** + * Utility for returning the address in the form host:port + * + * @return host and port for Compactor client connections + */ + private static String getHostPortString(HostAndPort address) { + if (address == null) { + return null; + } + return address.getHost() + ":" + address.getPort(); + } + + public static final String COMPACTOR_SERVICE = "COMPACTOR_SVC"; + + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); + private static final long TIME_BETWEEN_GC_CHECKS = 5000; + private static final long MAX_WAIT_TIME = 60000; + + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final UUID compactorId = UUID.randomUUID(); + private final AccumuloConfiguration aconf; + private final String queueName; + private final CompactionJobHolder jobHolder; + private ZooLock compactorLock; + + Compactor(CompactorServerOpts opts, String[] args) { + super("compactor", opts, args); + queueName = opts.getQueueName(); + ServerContext context = super.getContext(); + context.setupCrypto(); + + this.jobHolder = new CompactionJobHolder(); + aconf = getConfiguration(); + ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( + () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_GC_CHECKS, + TimeUnit.MILLISECONDS); + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + context.getInstanceID()); + } + + /** + * Set up nodes and locks in ZooKeeper for this Compactor + * + * @param clientAddress + * address of this Compactor + * + * @throws KeeperException + * @throws InterruptedException + */ + private void announceExistence(HostAndPort clientAddress) + throws KeeperException, InterruptedException { + + String hostPort = getHostPortString(clientAddress); + + ZooReaderWriter zoo = getContext().getZooReaderWriter(); + String compactorQueuePath = + getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.queueName; + String zPath = compactorQueuePath + "/" + hostPort; + + try { + zoo.mkdirs(compactorQueuePath); + zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); + } catch (KeeperException e) { + if (e.code() == KeeperException.Code.NOAUTH) { + LOG.error("Failed to write to ZooKeeper. Ensure that" + + " accumulo.properties, specifically instance.secret, is consistent."); + } + throw e; + } + + compactorLock = new ZooLock(getContext().getSiteConfiguration(), zPath, compactorId); + LockWatcher lw = new LockWatcher() { + @Override + public void lostLock(final LockLossReason reason) { + Halt.halt(1, () -> { + LOG.error("Compactor lost lock (reason = {}), exiting.", reason); + gcLogger.logGCInfo(getConfiguration()); + }); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e)); + } + }; + + try { + byte[] lockContent = (hostPort + "=" + COMPACTOR_SERVICE).getBytes(UTF_8); + for (int i = 0; i < 25; i++) { + zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); + + if (compactorLock.tryLock(lw, lockContent)) { + LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath()); + return; + } + LOG.info("Waiting for Compactor lock"); + sleepUninterruptibly(5, TimeUnit.SECONDS); + } + String msg = "Too many retries, exiting."; + LOG.info(msg); + throw new RuntimeException(msg); + } catch (Exception e) { + LOG.info("Could not obtain tablet server lock, exiting.", e); + throw new RuntimeException(e); + } + } + + /** + * Get the address of the CompactionCoordinator + * + * @return address of Coordinator + */ + private HostAndPort getCoordinatorAddress() { + try { + // TODO: Get the coordinator location from ZooKeeper + List<String> locations = null; + if (locations.isEmpty()) { + return null; + } + return HostAndPort.fromString(locations.get(0)); + } catch (Exception e) { + LOG.warn("Failed to obtain manager host " + e); + } + + return null; + } + + /** + * Start this Compactors thrift service to handle incoming client requests + * + * @return address of this compactor client service + * @throws UnknownHostException + */ + private ServerAddress startCompactorClientService() throws UnknownHostException { + Compactor rpcProxy = TraceUtil.wrapService(this); + final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor; + if (getContext().getThriftServerType() == ThriftServerType.SASL) { + Compactor tcredProxy = + TCredentialsUpdatingWrapper.service(rpcProxy, Compactor.class, getConfiguration()); + processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(tcredProxy); + } else { + processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(rpcProxy); + } + Property maxMessageSizeProperty = (aconf.get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null + ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(), + Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), + "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, + Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK, + maxMessageSizeProperty); + LOG.info("address = {}", sp.address); + return sp; + } + + /** + * Called by a thrift client to cancel the currently running compaction if it matches the supplied + * job + * + * @param compactionJob + * job + */ + @Override + public void cancel(CompactionJob compactionJob) { + synchronized (jobHolder) { + if (jobHolder.isSet() && jobHolder.getJob().equals(compactionJob)) { + LOG.info("Cancel requested for compaction job {}", compactionJob); + jobHolder.cancel(); + jobHolder.getThread().interrupt(); + } + } + } + + /** + * Send an update to the coordinator for this job + * + * @param coordinatorClient + * address of the CompactionCoordinator + * @param job + * compactionJob + * @param state + * updated state + * @param message + * updated message + */ + private void updateCompactionState(CompactionCoordinator.Client coordinatorClient, + CompactionJob job, CompactionState state, String message) { + RetryableThriftCall<Void> thriftCall = + new RetryableThriftCall<>(1000, MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() { + @Override + public Void execute() throws TException { + coordinatorClient.updateCompactionState(job, state, message, + System.currentTimeMillis()); + return null; + } + }); + thriftCall.run(); + } + + /** + * Get the next job to run + * + * @param coordinatorClient + * address of the CompactionCoordinator + * @param compactorAddress + * address of this Compactor + * @return CompactionJob + */ + private CompactionJob getNextJob(CompactionCoordinator.Client coordinatorClient, + String compactorAddress) { + RetryableThriftCall<CompactionJob> nextJobThriftCall = new RetryableThriftCall<>(1000, + MAX_WAIT_TIME, 0, new RetryableThriftFunction<CompactionJob>() { + @Override + public CompactionJob execute() throws TException { + return coordinatorClient.getCompactionJob(queueName, compactorAddress); + } + }); + return nextJobThriftCall.run(); + } + + @Override + public void run() { + + ServerAddress compactorAddress = null; + try { + compactorAddress = startCompactorClientService(); + } catch (UnknownHostException e1) { + throw new RuntimeException("Failed to start the compactor client service", e1); + } + final HostAndPort clientAddress = compactorAddress.address; + + try { + announceExistence(clientAddress); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Erroring registering in ZooKeeper", e); + } + + HostAndPort coordinatorHost = getCoordinatorAddress(); + if (null == coordinatorHost) { + throw new RuntimeException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + LOG.info("CompactionCoordinator address is: {}", coordinatorHost); + CompactionCoordinator.Client coordinatorClient; + try { + coordinatorClient = ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(), + coordinatorHost, getContext()); + } catch (TTransportException e2) { + throw new RuntimeException("Erroring connecting to CompactionCoordinator", e2); + } + + LOG.info("Compactor started, waiting for work"); + try { + + final AtomicReference<Throwable> err = new AtomicReference<>(); + + while (true) { + err.set(null); + jobHolder.reset(); + final CompactionJob job = getNextJob(coordinatorClient, getHostPortString(clientAddress)); + + LOG.info("Received next compaction job: {}", job); + + final LongAdder totalInputSize = new LongAdder(); + final LongAdder totalInputEntries = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); + + Thread compactionThread = Threads.createThread( + "Compaction job for tablet " + job.getExtent().toString(), new Runnable() { + @Override + public void run() { + try { + LOG.info("Setting up to run compactor"); + updateCompactionState(coordinatorClient, job, CompactionState.STARTED, + "Compaction started"); + + final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8)); + final TableConfiguration tConfig = getContext().getTableConfiguration(tableId); + + final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>(); + job.getFiles().forEach(f -> { + files.put(new StoredTabletFile(f.getMetadataFileEntry()), + new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp())); + totalInputSize.add(f.getSize()); + totalInputEntries.add(f.getEntries()); + }); + + final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile())); + + final CompactionEnv cenv = new CompactionEnv() { + @Override + public boolean isCompactionEnabled() { + return !jobHolder.isCancelled(); + } + + @Override + public IteratorScope getIteratorScope() { + return IteratorScope.majc; + } + + @Override + public RateLimiter getReadLimiter() { + return SharedRateLimiterFactory.getInstance(getContext().getConfiguration()) + .create("read_rate_limiter", () -> job.getReadRate()); + } + + @Override + public RateLimiter getWriteLimiter() { + return SharedRateLimiterFactory.getInstance(getContext().getConfiguration()) + .create("write_rate_limiter", () -> job.getWriteRate()); + } + + @Override + public SystemIteratorEnvironment createIteratorEnv(ServerContext context, + AccumuloConfiguration acuTableConf, TableId tableId) { + return new TabletIteratorEnvironment(getContext(), IteratorScope.majc, + !job.isPropagateDeletes(), acuTableConf, tableId, + CompactionKind.valueOf(job.getKind().name())); + } + + @Override + public SortedKeyValueIterator<Key,Value> getMinCIterator() { + throw new UnsupportedOperationException(); + } + + @Override + public CompactionReason getReason() { + switch (job.getKind()) { + case USER: + return CompactionReason.USER; + case CHOP: + return CompactionReason.CHOP; + case SELECTOR: + case SYSTEM: + default: + return CompactionReason.SYSTEM; + } + } + }; + + final List<IteratorSetting> iters = new ArrayList<>(); + job.getIteratorSettings().getIterators() + .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); + + org.apache.accumulo.server.compaction.Compactor compactor = + new org.apache.accumulo.server.compaction.Compactor(getContext(), + KeyExtent.fromThrift(job.getExtent()), files, outputFile, + job.isPropagateDeletes(), cenv, iters, tConfig); + + LOG.info("Starting compactor"); + started.countDown(); + jobHolder.setStats(compactor.call()); + + LOG.info("Compaction completed successfully"); + // Update state when completed + updateCompactionState(coordinatorClient, job, CompactionState.SUCCEEDED, + "Compaction completed successfully"); + } catch (Exception e) { + LOG.error("Compaction failed", e); + err.set(e); + throw new RuntimeException("Compaction failed", e); + } finally { + stopped.countDown(); + // TODO: Any cleanup + } + } + }); + + synchronized (jobHolder) { + jobHolder.set(job, compactionThread); + } + + compactionThread.start(); // start the compactionThread + try { + started.await(); // wait until the compactor is started + long inputEntries = totalInputEntries.sum(); + while (stopped.getCount() > 0) { + List<CompactionInfo> running = + org.apache.accumulo.server.compaction.Compactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + String message = String.format( + "Compaction in progress, read %d of %d input entries (%f %), written %d entries", + info.getEntriesRead(), inputEntries, + (info.getEntriesRead() / inputEntries) * 100, info.getEntriesWritten()); + LOG.info(message); + updateCompactionState(coordinatorClient, job, CompactionState.IN_PROGRESS, message); + } + } + UtilWaitThread.sleep(MAX_WAIT_TIME); + } + try { + compactionThread.join(); + CompactionStats stats = jobHolder.getStats(); + // TODO: Tell coordinator that we are finished, send stats. + + } catch (InterruptedException e) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to finish, cancelling job", + e); + cancel(job); + } + + } catch (InterruptedException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + cancel(job); + } + + if (compactionThread.isInterrupted()) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); + updateCompactionState(coordinatorClient, job, CompactionState.CANCELLED, + "Compaction cancelled"); + } + + Throwable thrown = err.get(); + if (thrown != null) { + updateCompactionState(coordinatorClient, job, CompactionState.FAILED, + "Compaction failed due to: " + thrown.getMessage()); + } + } + + } finally { + // close connection to coordinator + ThriftUtil.returnClient(coordinatorClient); + + // Shutdown local thrift server + LOG.debug("Stopping Thrift Servers"); + TServerUtils.stopTServer(compactorAddress.server); + + try { + LOG.debug("Closing filesystems"); + getContext().getVolumeManager().close(); + } catch (IOException e) { + LOG.warn("Failed to close filesystem : {}", e.getMessage(), e); + } + + gcLogger.logGCInfo(getConfiguration()); + LOG.info("stop requested. exiting ... "); + try { + compactorLock.unlock(); + } catch (Exception e) { + LOG.warn("Failed to release compactor lock", e); + } + } + + } + + public static void main(String[] args) throws Exception { + try (Compactor compactor = new Compactor(new CompactorServerOpts(), args)) { + compactor.runServer(); + } + } + +} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java new file mode 100644 index 0000000..5ce599e --- /dev/null +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.compactor; + +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryableThriftCall<T> { + + private static final Logger LOG = LoggerFactory.getLogger(RetryableThriftCall.class); + + private final long start; + private final long maxWaitTime; + private int maxNumRetries; + private final RetryableThriftFunction<T> function; + private final boolean retryForever; + + /** + * RetryableThriftCall constructor + * + * @param start + * initial wait time + * @param maxWaitTime + * max wait time + * @param maxNumRetries + * number of times to retry, 0 to retry forever + * @param function + * function to execute + */ + public RetryableThriftCall(long start, long maxWaitTime, int maxNumRetries, + RetryableThriftFunction<T> function) { + this.start = start; + this.maxWaitTime = maxWaitTime; + this.maxNumRetries = maxNumRetries; + this.function = function; + this.retryForever = (maxNumRetries == 0); + } + + /** + * Attempts to call the function, waiting and retrying when TException is thrown. Wait time is + * initially set to the start time and doubled each time, up to the maximum wait time. If + * maxNumRetries is 0, then this will retry forever. If maxNumRetries is non-zero, then a + * RuntimeException is thrown when it has exceeded he maxNumRetries parameter. + * + * @return T + * @throws RuntimeException + * when maximum number of retries has been exceeded + */ + public T run() { + long waitTime = start; + int numRetries = 0; + T result = null; + do { + try { + result = function.execute(); + } catch (TException e) { + LOG.error("Error in Thrift function talking to Coordinator, retrying in {}ms", waitTime); + if (!retryForever) { + numRetries++; + if (numRetries > maxNumRetries) { + throw new RuntimeException( + "Maximum number of retries (" + this.maxNumRetries + ") attempted."); + } + } + } + UtilWaitThread.sleep(waitTime); + if (waitTime != maxWaitTime) { + waitTime = Math.max(waitTime * 2, maxWaitTime); + } + } while (null == result); + return result; + } + +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java similarity index 77% rename from server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java rename to server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java index 243d36c..bfdbb4c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java @@ -16,15 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.tserver; +package org.apache.accumulo.compactor; -import java.io.IOException; +import org.apache.thrift.TException; -public class TooManyFilesException extends IOException { - - private static final long serialVersionUID = 1L; - - public TooManyFilesException(String msg) { - super(msg); - } +@FunctionalInterface +public interface RetryableThriftFunction<T> { + T execute() throws TException; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java index 7a8e180..9ccfcab 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.tserver.data.ServerConditionalMutation; import org.apache.hadoop.io.Text; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 300275e..77ae15b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -67,7 +67,8 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; -import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.server.fs.FileManager; +import org.apache.accumulo.server.fs.FileManager.ScanFileManager; import org.apache.accumulo.tserver.memory.LargestFirstMemoryManager; import org.apache.accumulo.tserver.memory.TabletMemoryReport; import org.apache.accumulo.tserver.session.ScanSession; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index fb108e9..7aff8d3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -122,6 +122,7 @@ import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.Compactor; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.data.ServerMutation; +import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.zookeeper.TransactionWatcher; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index a950290..6ff2a4c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -23,8 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.tserver.TabletServer; -import org.apache.accumulo.tserver.TooManyFilesException; import org.apache.accumulo.tserver.session.SingleScanSession; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Tablet; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index aae18e0..d6da2f8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -83,8 +83,8 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledExcepti import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.tserver.TabletIteratorEnvironment; import org.apache.accumulo.tserver.compaction.CompactionPlan; import org.apache.accumulo.tserver.compaction.CompactionStrategy; import org.apache.accumulo.tserver.compaction.MajorCompactionReason; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java index 75a424b..886cc68 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java @@ -46,12 +46,12 @@ import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.compaction.Compactor; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.MinorCompactionReason; -import org.apache.accumulo.tserver.TabletIteratorEnvironment; import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 1a9a98c..25d6c22 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -45,9 +45,9 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig; -import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.server.fs.FileManager.ScanFileManager; +import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; -import org.apache.accumulo.tserver.TabletIteratorEnvironment; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 6a83ea5..b6867f0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -93,6 +93,7 @@ import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; @@ -114,7 +115,6 @@ import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.TabletStatsKeeper; import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; -import org.apache.accumulo.tserver.TooManyFilesException; import org.apache.accumulo.tserver.TservConstraintEnv; import org.apache.accumulo.tserver.compactions.Compactable; import org.apache.accumulo.tserver.constraints.ConstraintChecker;