keith-turner commented on code in PR #5726:
URL: https://github.com/apache/accumulo/pull/5726#discussion_r2208018904
##########
core/src/main/thrift/compaction-coordinator.thrift:
##########
@@ -112,6 +112,7 @@ service CompactionCoordinatorService {
2:security.TCredentials credentials
3:string externalCompactionId
4:data.TKeyExtent extent
+ 5:string exceptionClassName
Review Comment:
this may be a breaking change to thrift RPC? or maybe it will be
null/ignored when there is difference so maybe its ok. Also coordinator is
experimental.
##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -1556,6 +1556,34 @@ public enum Property {
COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the compactor
servers.", "2.1.0"),
@Experimental
+ COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold",
"3",
+ PropertyType.COUNT,
+ "The number of consecutive failures that must occur before the Compactor
starts to back off"
+ + " processing compactions.",
+ "2.1.4"),
+ @Experimental
+ COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0",
+ PropertyType.TIMEDURATION,
+ "The time basis for computing the wait time for compaction failure
backoff. A value of zero disables"
+ + " the backoff feature. When a non-zero value is supplied, then
after compactor.failure.backoff.threshold"
+ + " failures have occurred, the compactor will wait
compactor.failure.backoff.interval * the number of"
+ + " failures seconds before executing the next compaction. For
example, if this value is 10s, then after"
+ + " three failures the Compactor will wait 30s before starting the
next compaction. If the compaction fails"
+ + " again, then it will wait 40s before starting the next
compaction.",
+ "2.1.4"),
+ @Experimental
+ COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m",
Review Comment:
Not recommending any changes here, was just pondering something. Another
way this could work is that it could set a max backoff time instead of a reset
time. Once we get to that max time we stop incrementing, but do not reset
until a success is seen. Not coming up w/ any advantages for this other
approach though. Wondering if there is any particular reason this reset after
time approach was chosen?
##########
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java:
##########
@@ -611,18 +620,89 @@ public void compactionCompleted(TInfo tinfo, TCredentials
credentials,
@Override
public void compactionFailed(TInfo tinfo, TCredentials credentials, String
externalCompactionId,
- TKeyExtent extent) throws ThriftSecurityException {
+ TKeyExtent extent, String exceptionClassName) throws
ThriftSecurityException {
// do not expect users to call this directly, expect other tservers to
call this method
if (!security.canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
- LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId,
fromThriftExtent);
+ LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}",
externalCompactionId,
+ fromThriftExtent, exceptionClassName);
final var ecid = ExternalCompactionId.of(externalCompactionId);
+ if (exceptionClassName != null) {
+ captureFailure(ecid, fromThriftExtent);
+ }
compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
}
+ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
+ var rc = RUNNING_CACHE.get(ecid);
+ if (rc != null) {
+ final String queue = rc.getQueueName();
+ failingQueues.computeIfAbsent(queue, q -> new
AtomicLong(0)).incrementAndGet();
+ final String compactor = rc.getCompactorAddress();
+ failingCompactors.computeIfAbsent(compactor, c -> new
AtomicLong(0)).incrementAndGet();
+ }
+ failingTables.computeIfAbsent(extent.tableId(), t -> new
AtomicLong(0)).incrementAndGet();
+ }
+
+ protected void startFailureSummaryLogging() {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(this::printFailures, 0, 5, TimeUnit.MINUTES);
+ ThreadPools.watchNonCriticalScheduledTask(future);
+ }
+
+ private void printFailures() {
+
+ // Remove down compactors from failing list
+ Map<String,List<HostAndPort>> allCompactors =
+ ExternalCompactionUtil.getCompactorAddrs(getContext());
+ Set<String> allCompactorAddrs = new HashSet<>();
+ allCompactors.values().forEach(l -> l.forEach(c ->
allCompactorAddrs.add(c.toString())));
+ failingCompactors.keySet().retainAll(allCompactorAddrs);
+
Review Comment:
Another way this tracking could work is that it counts success and failure.
Each time it logs it takes a snapshot of the counts, logs them, and then
deducts the snapshot it logged. This will give an indication of the successes
and failures since the last time this functions ran. Also could only log when
there is failure and maybe log a line per thing (interpreting large maps in the
log can be difficult). Maybe something like the following.
```java
// this is not really correct, its assuming we also get a snapshot of the
value in the map.. but that is not really true
Map<String, SuccessFailureCounts> queueSnapshot = Map.copyOf(failingQueues);
queueSnapshot.foreach((queue, counts)-> {
if(counts.failures > 0){
LOG.warn("Queue {} had {} successes and {} failures in the last
{}ms", queue, counts.successes, counts.failures, logInterval);
// TODO decrement counts logged from failingQueues, by decrementing
only the
// counts logged we do not lose any concurrent increments made while
logging
}
});
```
##########
core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java:
##########
@@ -66,5 +68,5 @@ default void init(ContextClassLoaderEnvironment env) {}
* consulting this plugin.
* @return the class loader for the given contextName
*/
- ClassLoader getClassLoader(String contextName);
+ ClassLoader getClassLoader(String contextName) throws IOException,
ReflectiveOperationException;
Review Comment:
Is there a benefit to these two specific exceptions? If we want information
to travel through the code via a checked exception, then it may be better to
create a very specific exception related to this SPI. This allows knowing that
class loader creation failed w/o trying to guess at specific reasons/exceptions
that it could fail, the specific reason should be in the cause. In general we
may want to know this type of failure happened, but we probably do not care too
much why it happened. Whenever it happens for any reasons its not good.
```java
// maybe this should extend Exception
/**
* @since 2.1.4
* /
public static class ClassLoaderCreationFailed extends AccumuloException {
public ClassLoaderCreationFailed(Throwable cause) {}
public ClassLoaderCreationFailed(String msg, Throwable cause) {}
}
ClassLoader getClassLoader(String contextName) throws
ClassLoaderCreationFailed;
```
We could also leave this SPI as is and create a new internal exception that
is always thrown when class loading creation fails. This allows this very
specific and important information to travel in the internal code. Could do
the more minimal change below in 2.1 and add the checked exception to the SPI
in 4.0. Not opposed to adding a checked exception in 2.1.4 to the SPI though,
would need to document the breaking change in the release notes.
```java
public class ClassLoaderUtil {
// create this class outside of public API... any code in the class that
attempts to create a classloader and fails should throw this exception
// this could be a checked or runtime exception... not sure which is best
public static class ClassLoaderCreationFailed extends RuntimeException {
}
public static ClassLoader getClassLoader(String context) {
try{
return FACTORY.getClassLoader(context);
} catch (Exception e) {
throw new ClassLoaderCreationFailed(e);
}
}
}
```
##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -712,6 +717,44 @@ public void run() {
err.set(null);
JOB_HOLDER.reset();
+ // consecutive failure processing
+ final long totalFailures =
+ errorHistory.values().stream().mapToLong(p ->
p.getSecond().get()).sum();
+ if (totalFailures > 0) {
+ LOG.warn("This Compactor has had {} consecutive failures.
Failures: {}", totalFailures,
+ errorHistory);
Review Comment:
Wondering how this will look, its always relogging the entire error history.
Also it seems like it will only log the first exception seen for a table, is
that the intent? I will know more about how this looks when I run some test.
##########
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java:
##########
@@ -712,6 +717,44 @@ public void run() {
err.set(null);
JOB_HOLDER.reset();
+ // consecutive failure processing
+ final long totalFailures =
+ errorHistory.values().stream().mapToLong(p ->
p.getSecond().get()).sum();
+ if (totalFailures > 0) {
Review Comment:
Could all of this be moved into a function? This body of this while loop is
already long and its already difficult to work out its higher level structure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]