This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new b0317c7 Updates from testing b0317c7 is described below commit b0317c77a11e78f0683ddbe4b0f88d1c54b66f55 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 16 19:33:12 2021 +0000 Updates from testing Added SharedRateLimiterFactory::remove to remove the compaction limiter from the list when we are done to prevent the background threads from getting a NPE. Modified the CompactionCoordinator::getCompactionJob loop to try another server if the first one does not have a job. Modified Thrift calls so that they did not return null and instead return an empty object. Modified all RetryableThriftCall<Void> to RetryableThriftCall<String> so that the object did not retry the function when there was a successful call. --- .../util/ratelimit/SharedRateLimiterFactory.java | 14 ++ .../coordinator/CompactionCoordinator.java | 235 ++++++++++----------- .../accumulo/coordinator/RunningCompaction.java | 12 +- .../accumulo/compactor/CompactionEnvironment.java | 19 +- .../org/apache/accumulo/compactor/Compactor.java | 104 +++++---- .../accumulo/tserver/ThriftClientHandler.java | 3 +- .../tserver/compactions/ExternalCompactionJob.java | 2 + .../accumulo/tserver/tablet/CompactableImpl.java | 2 + 8 files changed, 201 insertions(+), 190 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java index c24a3d2..accfd95 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java @@ -99,6 +99,20 @@ public class SharedRateLimiterFactory { } /** + * Remove the rate limiter from the set of active limiters, if it exists + * + * @param name + * key for the rate limiter + */ + public void remove(String name) { + synchronized (activeLimiters) { + if (activeLimiters.containsKey(name)) { + activeLimiters.remove(name); + } + } + } + + /** * Walk through all of the currently active RateLimiters, having each update its current rate. * This is called periodically so that we can dynamically update as configuration changes. */ diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 5ede596..b4b2cb4 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -189,6 +189,9 @@ public class CompactionCoordinator extends AbstractServer // TODO: On initial startup contact all running tservers to get information about the // compactions that are current running in external queues to populate the RUNNING map. // This is to handle the case where the coordinator dies or is restarted at runtime + // + // Alternatively, we could use the status messages in updateCompactionStatus to rebuild + // the RUNNING map. tserverSet.startListeningForTabletServerChanges(); @@ -251,7 +254,7 @@ public class CompactionCoordinator extends AbstractServer // Find any running compactions for the tserver final List<ExternalCompactionId> toCancel = new ArrayList<>(); RUNNING.forEach((k, v) -> { - if (v.getCompactorAddress().equals(tsi)) { + if (v.getTserver().equals(tsi)) { toCancel.add(k); } }); @@ -295,63 +298,84 @@ public class CompactionCoordinator extends AbstractServer // CBUG need to use and check for system credentials LOG.debug("getCompactionJob " + queueName + " " + compactorAddress); String queue = queueName.intern(); - TServerInstance tserver = null; - Long priority = null; + TExternalCompactionJob result = null; // CBUG Review synchronization on QUEUES synchronized (QUEUES) { TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue); - if (null != m) { - while (tserver == null) { + if (null != m && !m.isEmpty()) { + while (result == null) { + + // m could become empty if we have contacted all tservers in this queue and + // there are no compactions + if (m.isEmpty()) { + LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, + compactorAddress); + result = new TExternalCompactionJob(); + break; + } + // Get the first TServerInstance from the highest priority queue Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry(); - priority = entry.getKey(); + Long priority = entry.getKey(); LinkedHashSet<TServerInstance> tservers = entry.getValue(); - if (null == tservers || m.isEmpty()) { + + if (null == tservers || tservers.isEmpty()) { // Clean up the map entry when no tservers for this queue and priority m.remove(entry.getKey(), entry.getValue()); continue; } else { - tserver = tservers.iterator().next(); + TServerInstance tserver = tservers.iterator().next(); + LOG.debug("Found tserver {} with priority {} for queue {}", tserver.getHostAndPort(), + priority, queue); // Remove the tserver from the list, we are going to run a compaction on this server tservers.remove(tserver); - if (tservers.size() == 0) { + if (tservers.isEmpty()) { // Clean up the map entry when no tservers remaining for this queue and priority + // CBUG This may be redundant as cleanup happens in the 'if' clause above m.remove(entry.getKey(), entry.getValue()); } HashSet<QueueAndPriority> qp = INDEX.get(tserver); qp.remove(QueueAndPriority.get(queue, priority)); - if (qp.size() == 0) { + if (qp.isEmpty()) { // Remove the tserver from the index INDEX.remove(tserver); } - break; + LOG.debug("Getting compaction for queue {} from tserver {}", queue, + tserver.getHostAndPort()); + // Get a compaction from the tserver + TabletClientService.Client client = null; + try { + client = getTabletServerConnection(tserver); + TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(), + getContext().rpcCreds(), queue, priority, compactorAddress); + if (null == job.getExternalCompactionId()) { + LOG.debug("No compactions found for queue {} on tserver {}, trying next tserver", + queue, tserver.getHostAndPort(), compactorAddress); + continue; + } + RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), + new RunningCompaction(job, compactorAddress, tserver)); + LOG.debug("Returning external job {} to {}", job.externalCompactionId, + compactorAddress); + result = job; + break; + } catch (TException e) { + LOG.error( + "Error from tserver {} while trying to reserve compaction, trying next tserver", + ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e); + } finally { + ThriftUtil.returnClient(client); + } } } + } else { + LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, + compactorAddress); + result = new TExternalCompactionJob(); } } + return result; - if (null == tserver) { - LOG.debug("No compactions found for queue {}, returning empty job to compactor {}", queue, - compactorAddress); - return new TExternalCompactionJob(); - } - - TabletClientService.Client client = null; - try { - client = getTabletServerConnection(tserver); - TExternalCompactionJob job = client.reserveCompactionJob(TraceUtil.traceInfo(), - getContext().rpcCreds(), queue, priority, compactorAddress); - RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), - new RunningCompaction(job, compactorAddress, tserver)); - LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); - return job; - } catch (TException e) { - LOG.error("Error reserving compaction from tserver {}", - ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e); - throw e; - } finally { - ThriftUtil.returnClient(client); - } } protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver) @@ -377,44 +401,52 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Compaction cancel requested, id: {}", externalCompactionId); RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); if (null == rc) { - throw new UnknownCompactionIdException(); + return; } - HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress()); - RetryableThriftCall<Void> cancelThriftCall = new RetryableThriftCall<>(1000, - RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<Void>() { - @Override - public Void execute() throws TException { - Compactor.Client compactorConnection = null; - try { - compactorConnection = getCompactorConnection(compactor); - compactorConnection.cancel(rc.getJob().getExternalCompactionId()); - return null; - } catch (TException e) { - throw e; - } finally { - ThriftUtil.returnClient(compactorConnection); + if (!rc.isCompleted()) { + HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress()); + RetryableThriftCall<String> cancelThriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<String>() { + @Override + public String execute() throws TException { + Compactor.Client compactorConnection = null; + try { + compactorConnection = getCompactorConnection(compactor); + compactorConnection.cancel(rc.getJob().getExternalCompactionId()); + return ""; + } catch (TException e) { + throw e; + } finally { + ThriftUtil.returnClient(compactorConnection); + } } - } - }); - try { - cancelThriftCall.run(); - } catch (RetriesExceededException e) { - LOG.error("Unable to contact Compactor {} to cancel running compaction {}", - rc.getCompactorAddress(), rc.getJob(), e); + }); + try { + cancelThriftCall.run(); + } catch (RetriesExceededException e) { + LOG.error("Unable to contact Compactor {} to cancel running compaction {}", + rc.getCompactorAddress(), rc.getJob(), e); + } } } + /** + * TServer calls getCompactionStatus to get information about the compaction + * + * @param externalCompactionId + * id + * @return compaction stats or null if not running + */ @Override public List<Status> getCompactionStatus(String externalCompactionId) throws TException { + List<Status> status = new ArrayList<>(); RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); - if (null == rc) { - throw new UnknownCompactionIdException(); + if (null != rc) { + rc.getUpdates().forEach((k, v) -> { + status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(), + rc.getCompactorAddress(), v.getState(), v.getMessage())); + }); } - List<Status> status = new ArrayList<>(); - rc.getUpdates().forEach((k, v) -> { - status.add(new Status(v.getTimestamp(), rc.getJob().getExternalCompactionId(), - rc.getCompactorAddress(), v.getState(), v.getMessage())); - }); return status; } @@ -434,6 +466,7 @@ public class CompactionCoordinator extends AbstractServer RunningCompaction rc = RUNNING.get(ecid); if (null != rc) { rc.setStats(stats); + rc.setCompleted(); } else { LOG.error( "Compaction completed called by Compactor for {}, but no running compaction for that id.", @@ -442,17 +475,19 @@ public class CompactionCoordinator extends AbstractServer } // Attempt up to ten times to contact the TServer and notify it that the compaction has // completed. - RetryableThriftCall<Void> completedThriftCall = new RetryableThriftCall<>(1000, - RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<Void>() { + RetryableThriftCall<String> completedThriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 10, new RetryableThriftFunction<String>() { @Override - public Void execute() throws TException { + public String execute() throws TException { TabletClientService.Client client = null; try { client = getTabletServerConnection(rc.getTserver()); client.compactionJobFinished(TraceUtil.traceInfo(), getContext().rpcCreds(), externalCompactionId, stats.fileSize, stats.entriesWritten); RUNNING.remove(ecid, rc); - return null; + LOG.info("TServer {} notified of compaction {} completion", + rc.getTserver().getHostAndPort(), externalCompactionId); + return ""; } catch (TException e) { throw e; } finally { @@ -461,64 +496,6 @@ public class CompactionCoordinator extends AbstractServer } }); try { - // CBUG Saw the following situation in testing: - // 1. Compactor ran a compaction and completed. - // 2. Compactor called this method - // 3. Thrift timeout occurred and the Compactor retried due to RetryableThriftCall - // 4. Upon retry this method returned UnknownCompactionIdException because no entry in RUNNING - - // See Method below where tserver could poll coordinator to see if compaction is completed. - - // "compactor" #38 prio=5 os_prio=0 cpu=157.59ms elapsed=197.99s tid=0x000055ea28438800 - // nid=0x4dae runnable [0x00007fb0c5f2e000] - // java.lang.Thread.State: RUNNABLE - // at sun.nio.ch.EPoll.wait(java.base@11.0.10/Native Method) - // at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@11.0.10/EPollSelectorImpl.java:120) - // at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@11.0.10/SelectorImpl.java:124) - // - locked <0x00000000f449acc0> (a sun.nio.ch.Util$2) - // - locked <0x00000000f449aa60> (a sun.nio.ch.EPollSelectorImpl) - // at sun.nio.ch.SelectorImpl.select(java.base@11.0.10/SelectorImpl.java:136) - // at - // org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336) - // at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157) - // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) - // at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) - // at java.io.FilterInputStream.read(java.base@11.0.10/FilterInputStream.java:133) - // at java.io.BufferedInputStream.fill(java.base@11.0.10/BufferedInputStream.java:252) - // at java.io.BufferedInputStream.read1(java.base@11.0.10/BufferedInputStream.java:292) - // at java.io.BufferedInputStream.read(java.base@11.0.10/BufferedInputStream.java:351) - // - locked <0x00000000f466c028> (a java.io.BufferedInputStream) - // at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) - // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) - // at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:132) - // at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:100) - // at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) - // at - // org.apache.accumulo.core.clientImpl.ThriftTransportPool$CachedTTransport.readAll(ThriftTransportPool.java:546) - // at org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:637) - // at org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:505) - // at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) - // at - // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.recv_compactionCompleted(CompactionCoordinator.java:118) - // at - // org.apache.accumulo.core.compaction.thrift.CompactionCoordinator$Client.compactionCompleted(CompactionCoordinator.java:104) - // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:296) - // at org.apache.accumulo.compactor.Compactor$3.execute(Compactor.java:291) - // at - // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:102) - // at org.apache.accumulo.compactor.Compactor.updateCompactionCompleted(Compactor.java:304) - // at org.apache.accumulo.compactor.Compactor.run(Compactor.java:509) - - // "CompactionCoordinator-ClientPool-Worker-1" #47 daemon prio=5 os_prio=0 cpu=70.37ms - // elapsed=243.97s tid=0x00005590266cb800 nid=0x4db3 waiting on condition [0x00007f4c8cb6c000] - // java.lang.Thread.State: TIMED_WAITING (sleeping) - // at java.lang.Thread.sleep(java.base@11.0.10/Native Method) - // at org.apache.accumulo.fate.util.UtilWaitThread.sleep(UtilWaitThread.java:33) - // at - // org.apache.accumulo.server.compaction.RetryableThriftCall.run(RetryableThriftCall.java:113) - // at - // org.apache.accumulo.coordinator.CompactionCoordinator.compactionCompleted(CompactionCoordinator.java:462) - completedThriftCall.run(); } catch (RetriesExceededException e) { // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify @@ -548,13 +525,14 @@ public class CompactionCoordinator extends AbstractServer * * * @param externalCompactionId - * @return CompactionStats or null if not completed - * @throws TException + * @return CompactionStats + * @throws UnknownCompactionIdException + * if compaction is not running */ public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException { var ecid = ExternalCompactionId.of(externalCompactionId); RunningCompaction rc = RUNNING.get(ecid); - if (null != rc && null != rc.getStats()) { + if (null != rc && rc.isCompleted()) { RUNNING.remove(ecid, rc); return rc.getStats(); } else if (rc == null) { @@ -591,6 +569,9 @@ public class CompactionCoordinator extends AbstractServer if (null != rc) { rc.addUpdate(timestamp, message, state); } else { + // TODO: If the Coordinator was restarted, we could use these status messages + // to re-populate the RUNNING set. This would require the job, compactor address + // and TServerInstance throw new UnknownCompactionIdException(); } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java index cc36f50..ea35349 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java @@ -20,6 +20,7 @@ package org.apache.accumulo.coordinator; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.compaction.thrift.CompactionState; import org.apache.accumulo.core.metadata.TServerInstance; @@ -31,7 +32,8 @@ public class RunningCompaction { private final TExternalCompactionJob job; private final String compactorAddress; private final TServerInstance tserver; - private Map<Long,CompactionUpdate> updates = new TreeMap<>(); + private final Map<Long,CompactionUpdate> updates = new TreeMap<>(); + private final AtomicBoolean completed = new AtomicBoolean(Boolean.FALSE); private CompactionStats stats = null; RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) { @@ -69,4 +71,12 @@ public class RunningCompaction { return tserver; } + public boolean isCompleted() { + return completed.get(); + } + + public void setCompleted() { + completed.compareAndSet(false, true); + } + } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java index bfda224..fad132b 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.compactor; +import java.io.Closeable; +import java.io.IOException; + import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; @@ -33,14 +36,22 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; -public class CompactionEnvironment implements CompactionEnv { +public class CompactionEnvironment implements Closeable, CompactionEnv { private final ServerContext context; private final CompactionJobHolder jobHolder; + private final SharedRateLimiterFactory limiter; CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) { this.context = context; this.jobHolder = jobHolder; + this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()); + } + + @Override + public void close() throws IOException { + limiter.remove("read_rate_limiter"); + limiter.remove("write_rate_limiter"); } @Override @@ -55,14 +66,12 @@ public class CompactionEnvironment implements CompactionEnv { @Override public RateLimiter getReadLimiter() { - return SharedRateLimiterFactory.getInstance(context.getConfiguration()) - .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate()); + return limiter.create("read_rate_limiter", () -> jobHolder.getJob().getReadRate()); } @Override public RateLimiter getWriteLimiter() { - return SharedRateLimiterFactory.getInstance(context.getConfiguration()) - .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate()); + return limiter.create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate()); } @Override diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index f1d2454..7922c4f 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -65,7 +65,6 @@ import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.compaction.CompactionInfo; -import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; import org.apache.accumulo.server.compaction.ExternalCompactionUtil; import org.apache.accumulo.server.compaction.RetryableThriftCall; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; @@ -287,14 +286,14 @@ public class Compactor extends AbstractServer */ protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats) throws RetriesExceededException { - RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000, - RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() { + RetryableThriftCall<String> thriftCall = new RetryableThriftCall<>(1000, + RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<String>() { @Override - public Void execute() throws TException { + public String execute() throws TException { try { coordinatorClient.compareAndSet(null, getCoordinatorClient()); coordinatorClient.get().compactionCompleted(job.getExternalCompactionId(), stats); - return null; + return ""; } catch (TException e) { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); throw e; @@ -379,7 +378,6 @@ public class Compactor extends AbstractServer final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8)); final TableConfiguration tConfig = getContext().getTableConfiguration(tableId); final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile())); - final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder); final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>(); job.getFiles().forEach(f -> { @@ -392,20 +390,22 @@ public class Compactor extends AbstractServer job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - org.apache.accumulo.server.compaction.Compactor compactor = - new org.apache.accumulo.server.compaction.Compactor(getContext(), - KeyExtent.fromThrift(job.getExtent()), files, outputFile, - job.isPropagateDeletes(), cenv, iters, tConfig); - - LOG.info("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - CompactionStats cs = new CompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - jobHolder.setStats(cs); + try (CompactionEnvironment cenv = new CompactionEnvironment(getContext(), jobHolder)) { + org.apache.accumulo.server.compaction.Compactor compactor = + new org.apache.accumulo.server.compaction.Compactor(getContext(), + KeyExtent.fromThrift(job.getExtent()), files, outputFile, + job.isPropagateDeletes(), cenv, iters, tConfig); + + LOG.info("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); + CompactionStats cs = new CompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + jobHolder.setStats(cs); + } LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); // Update state when completed updateCompactionState(job, CompactionState.SUCCEEDED, @@ -504,28 +504,40 @@ public class Compactor extends AbstractServer } } } - try { - compactionThread.join(); - this.updateCompactionCompleted(job, jobHolder.getStats()); - } catch (InterruptedException e) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to finish, cancelling job", - e); + compactionThread.join(); + + if (compactionThread.isInterrupted()) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); + try { + updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled"); + // CBUG Might need to call updateCompactionCompleted or the TServer will not + // get notified that the compaction was cancelled + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } + } else if (err.get() != null) { try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + updateCompactionState(job, CompactionState.FAILED, + "Compaction failed due to: " + err.get().getMessage()); + // CBUG Might need to call updateCompactionCompleted or the TServer will not + // get notified that the compaction failed + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure.", e); } - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", e); + } else { try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + this.updateCompactionCompleted(job, jobHolder.getStats()); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } } } - } catch (InterruptedException e1) { LOG.error( "Compactor thread was interrupted waiting for compaction to start, cancelling job", @@ -537,24 +549,6 @@ public class Compactor extends AbstractServer } } - if (compactionThread.isInterrupted()) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled"); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } - } - - Throwable thrown = err.get(); - if (thrown != null) { - try { - updateCompactionState(job, CompactionState.FAILED, - "Compaction failed due to: " + thrown.getMessage()); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure.", e); - } - } } } catch (Exception e) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index f640e37..d8cebd1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -1687,8 +1687,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe return extCompaction.toThrift(); } - // CBUG thrift may not support null return types https://thrift.apache.org/docs/features.html - return null; + return new TExternalCompactionJob(); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index e2b04d9..3c02127 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -47,6 +47,8 @@ public class ExternalCompactionJob { private CompactionKind kind; private List<IteratorSetting> iters; + public ExternalCompactionJob() {} + public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes, TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId, long priority, CompactionKind kind, List<IteratorSetting> iters) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index ecd7083..b9439a7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -771,9 +771,11 @@ public class CompactableImpl implements Compactable { TabletLogger.compacted(getExtent(), cInfo.job, metaFile); } catch (Exception e) { metaFile = null; + log.error("Error committing external compaction {}", extCompactionId, e); throw new RuntimeException(e); } finally { completeCompaction(cInfo.job, cInfo.jobFiles, metaFile); + log.debug("Completed commit of external compaction{}", extCompactionId); } } else { log.debug("Ignoring request to commit external compaction that is unknown {}",