This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8384b9be6a Fate Op Command Updates and Tests (#4350) 8384b9be6a is described below commit 8384b9be6abe786ad855d5457eba6bceb3868025 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Wed Mar 13 14:07:33 2024 -0400 Fate Op Command Updates and Tests (#4350) Changes: - Update summary FateOpsCommand - Prints full FateId not just the long - Option to filter based on FateInstanceType - Option to filter based on FateId - Originally, the summary command printed all Fate transactions, thought it might be good to optionally allow filtering by certain Fate transactions like print allows - Update print FateOpsCommand - Prints full FateId not just the long - Option to filter based on FateInstanceType - Option to filter based on FateId - Originally, could filter by long id, but with the replacement of long transaction id by FateId, this had to be updated. - Instead of receiving "<hex id>" or "FATE[<hex id>]" on cmd line, now expects a FateId - Update cancel, delete, and fail FateOpsCommand - Now work with both ZooStore and AccumuloStore by taking the full FateId on the command line and determining the store based on the FateInstanceType of the FateId - Added tests for all 5 FateOpsCommands (FateOpsCommandsIT) - Tests using both AccumuloStore (AccumuloFateOpsCommandsIT) and ZooStore (ZookeeperFateOpsCommandsIT) - Deleted FateSummaryIT, moved the test to FateOpsCommands * Changes: - Wait for condition after stopping ServerType.COMPACTOR instead of sleep - Cleaned up tests to use fewer assertions - Added attempt to --delete and --fail a transaction when the Manager is still alive in testFateDeleteCommand() and testFateFailCommand() --- .../org/apache/accumulo/core/fate/AdminUtil.java | 239 +++++---- .../org/apache/accumulo/core/fate/FateTxId.java | 8 - .../org/apache/accumulo/server/util/Admin.java | 110 ++-- .../server/util/fateCommand/FateSummaryReport.java | 34 +- .../server/util/fateCommand/FateTxnDetails.java | 24 +- .../server/util/fateCommand/SummaryReportTest.java | 11 +- .../server/util/fateCommand/TxnDetailsTest.java | 7 +- .../manager/metrics/fate/FateMetricValues.java | 2 +- .../org/apache/accumulo/test/FateSummaryIT.java | 156 ------ .../accumulo/test/fate/FateOpsCommandsIT.java | 564 +++++++++++++++++++++ .../fate/accumulo/AccumuloFateOpsCommandsIT.java | 32 ++ .../fate/zookeeper/ZookeeperFateOpsCommandsIT.java | 37 ++ .../test/functional/FateConcurrencyIT.java | 14 +- .../test/functional/FunctionalTestUtils.java | 2 +- 14 files changed, 888 insertions(+), 352 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 2a436b3444..c18defb1ac 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -46,7 +46,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; -import org.apache.accumulo.core.util.FastFormat; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +74,7 @@ public class AdminUtil<T> { */ public static class TransactionStatus { - private final long txid; + private final FateId fateId; private final FateInstanceType instanceType; private final TStatus status; private final String txName; @@ -84,10 +83,10 @@ public class AdminUtil<T> { private final String top; private final long timeCreated; - private TransactionStatus(Long tid, FateInstanceType instanceType, TStatus status, + private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status, String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) { - this.txid = tid; + this.fateId = fateId; this.instanceType = instanceType; this.status = status; this.txName = txName; @@ -102,8 +101,8 @@ public class AdminUtil<T> { * @return This fate operations transaction id, formatted in the same way as FATE transactions * are in the Accumulo logs. */ - public String getTxid() { - return FastFormat.toHexString(txid); + public FateId getFateId() { + return fateId; } public FateInstanceType getInstanceType() { @@ -161,32 +160,14 @@ public class AdminUtil<T> { public static class FateStatus { private final List<TransactionStatus> transactions; - private final Map<String,List<String>> danglingHeldLocks; - private final Map<String,List<String>> danglingWaitingLocks; - - /** - * Convert FATE transactions IDs in keys of map to format that used in printing and logging FATE - * transactions ids. This is done so that if the map is printed, the output can be used to - * search Accumulo's logs. - */ - private static Map<String,List<String>> convert(Map<Long,List<String>> danglocks) { - if (danglocks.isEmpty()) { - return Collections.emptyMap(); - } - - Map<String,List<String>> ret = new HashMap<>(); - for (Entry<Long,List<String>> entry : danglocks.entrySet()) { - ret.put(FastFormat.toHexString(entry.getKey()), - Collections.unmodifiableList(entry.getValue())); - } - return Collections.unmodifiableMap(ret); - } + private final Map<FateId,List<String>> danglingHeldLocks; + private final Map<FateId,List<String>> danglingWaitingLocks; private FateStatus(List<TransactionStatus> transactions, - Map<Long,List<String>> danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) { + Map<FateId,List<String>> danglingHeldLocks, Map<FateId,List<String>> danglingWaitingLocks) { this.transactions = Collections.unmodifiableList(transactions); - this.danglingHeldLocks = convert(danglingHeldLocks); - this.danglingWaitingLocks = convert(danglingWaitingLocks); + this.danglingHeldLocks = danglingHeldLocks; + this.danglingWaitingLocks = danglingWaitingLocks; } public List<TransactionStatus> getTransactions() { @@ -194,91 +175,95 @@ public class AdminUtil<T> { } /** - * Get locks that are held by non existent FATE transactions. These are table or namespace + * Get locks that are held by non-existent FATE transactions. These are table or namespace * locks. * * @return map where keys are transaction ids and values are a list of table IDs and/or * namespace IDs. The transaction IDs are in the same format as transaction IDs in the * Accumulo logs. */ - public Map<String,List<String>> getDanglingHeldLocks() { + public Map<FateId,List<String>> getDanglingHeldLocks() { return danglingHeldLocks; } /** - * Get locks that are waiting to be acquired by non existent FATE transactions. These are table + * Get locks that are waiting to be acquired by non-existent FATE transactions. These are table * or namespace locks. * * @return map where keys are transaction ids and values are a list of table IDs and/or * namespace IDs. The transaction IDs are in the same format as transaction IDs in the * Accumulo logs. */ - public Map<String,List<String>> getDanglingWaitingLocks() { + public Map<FateId,List<String>> getDanglingWaitingLocks() { return danglingWaitingLocks; } } /** - * Returns a list of the FATE transactions, optionally filtered by transaction id and status. This - * method does not process lock information, if lock information is desired, use - * {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet)} + * Returns a list of the FATE transactions, optionally filtered by fate id, status, and fate + * instance type. This method does not process lock information, if lock information is desired, + * use {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet, EnumSet)} * * @param fateStores read-only fate stores - * @param filterTxid filter results to include for provided transaction ids. - * @param filterStatus filter results to include only provided status types + * @param fateIdFilter filter results to include only provided fate transaction ids + * @param statusFilter filter results to include only provided status types + * @param typesFilter filter results to include only provided fate instance types * @return list of FATE transactions that match filter criteria */ public List<TransactionStatus> getTransactionStatus( - Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, Set<Long> filterTxid, - EnumSet<TStatus> filterStatus) { + Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, Set<FateId> fateIdFilter, + EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter) { - FateStatus status = getTransactionStatus(fateStores, filterTxid, filterStatus, - Collections.<Long,List<String>>emptyMap(), Collections.<Long,List<String>>emptyMap()); + FateStatus status = getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, + Collections.<FateId,List<String>>emptyMap(), Collections.<FateId,List<String>>emptyMap()); return status.getTransactions(); } /** * Get the FATE transaction status and lock information stored in zookeeper, optionally filtered - * by transaction id and filter status. + * by fate id, status, and fate instance type * * @param zs read-only zoostore * @param zk zookeeper reader. * @param lockPath the zookeeper path for locks - * @param filterTxid filter results to include for provided transaction ids. - * @param filterStatus filter results to include only provided status types + * @param fateIdFilter filter results to include only provided fate transaction ids + * @param statusFilter filter results to include only provided status types + * @param typesFilter filter results to include only provided fate instance types * @return a summary container of the fate transactions. * @throws KeeperException if zookeeper exception occurs * @throws InterruptedException if process is interrupted. */ public FateStatus getStatus(ReadOnlyFateStore<T> zs, ZooReader zk, - ServiceLock.ServiceLockPath lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) - throws KeeperException, InterruptedException { - Map<Long,List<String>> heldLocks = new HashMap<>(); - Map<Long,List<String>> waitingLocks = new HashMap<>(); + ServiceLock.ServiceLockPath lockPath, Set<FateId> fateIdFilter, EnumSet<TStatus> statusFilter, + EnumSet<FateInstanceType> typesFilter) throws KeeperException, InterruptedException { + Map<FateId,List<String>> heldLocks = new HashMap<>(); + Map<FateId,List<String>> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(Map.of(FateInstanceType.META, zs), filterTxid, filterStatus, - heldLocks, waitingLocks); + return getTransactionStatus(Map.of(FateInstanceType.META, zs), fateIdFilter, statusFilter, + typesFilter, heldLocks, waitingLocks); } - public FateStatus getStatus(ReadOnlyFateStore<T> as, Set<Long> filterTxid, - EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException { + public FateStatus getStatus(ReadOnlyFateStore<T> as, Set<FateId> fateIdFilter, + EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter) + throws KeeperException, InterruptedException { - return getTransactionStatus(Map.of(FateInstanceType.USER, as), filterTxid, filterStatus, - new HashMap<>(), new HashMap<>()); + return getTransactionStatus(Map.of(FateInstanceType.USER, as), fateIdFilter, statusFilter, + typesFilter, new HashMap<>(), new HashMap<>()); } public FateStatus getStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk, - ServiceLock.ServiceLockPath lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus) - throws KeeperException, InterruptedException { - Map<Long,List<String>> heldLocks = new HashMap<>(); - Map<Long,List<String>> waitingLocks = new HashMap<>(); + ServiceLock.ServiceLockPath lockPath, Set<FateId> fateIdFilter, EnumSet<TStatus> statusFilter, + EnumSet<FateInstanceType> typesFilter) throws KeeperException, InterruptedException { + Map<FateId,List<String>> heldLocks = new HashMap<>(); + Map<FateId,List<String>> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(fateStores, filterTxid, filterStatus, heldLocks, waitingLocks); + return getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, heldLocks, + waitingLocks); } /** @@ -292,7 +277,7 @@ public class AdminUtil<T> { * @throws InterruptedException if thread interrupt detected while processing. */ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath, - final Map<Long,List<String>> heldLocks, final Map<Long,List<String>> waitingLocks) + final Map<FateId,List<String>> heldLocks, final Map<FateId,List<String>> waitingLocks) throws KeeperException, InterruptedException { // stop with exception if lock ids cannot be retrieved from zookeeper @@ -312,13 +297,15 @@ public class AdminUtil<T> { for (String node : lockNodes) { try { byte[] data = zk.getData(lockPath + "/" + id + "/" + node); - String[] lda = new String(data, UTF_8).split(":"); + // Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2 + String[] lda = new String(data, UTF_8).split(":", 2); + FateId fateId = FateId.from(lda[1]); if (lda[0].charAt(0) == 'W') { sawWriteLock = true; } - Map<Long,List<String>> locks; + Map<FateId,List<String>> locks; if (pos == 0) { locks = heldLocks; @@ -328,8 +315,7 @@ public class AdminUtil<T> { locks = waitingLocks; } - locks.computeIfAbsent(Long.parseLong(lda[1], 16), k -> new ArrayList<>()) - .add(lda[0].charAt(0) + ":" + id); + locks.computeIfAbsent(fateId, k -> new ArrayList<>()).add(lda[0].charAt(0) + ":" + id); } catch (Exception e) { log.error("{}", e.getMessage(), e); @@ -355,17 +341,20 @@ public class AdminUtil<T> { * Returns fate status, possibly filtered * * @param fateStores read-only access to populated transaction stores. - * @param filterTxid Optional. List of transactions to filter results - if null, all transactions - * are returned - * @param filterStatus Optional. List of status types to filter results - if null, all + * @param fateIdFilter Optional. List of transactions to filter results - if null, all + * transactions are returned + * @param statusFilter Optional. List of status types to filter results - if null, all * transactions are returned. + * @param typesFilter Optional. List of fate instance types to filter results - if null, all + * transactions are returned * @param heldLocks populated list of locks held by transaction - or an empty map if none. * @param waitingLocks populated list of locks held by transaction - or an empty map if none. * @return current fate and lock status */ private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, - Set<Long> filterTxid, EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks, - Map<Long,List<String>> waitingLocks) { + Set<FateId> fateIdFilter, EnumSet<TStatus> statusFilter, + EnumSet<FateInstanceType> typesFilter, Map<FateId,List<String>> heldLocks, + Map<FateId,List<String>> waitingLocks) { final List<TransactionStatus> statuses = new ArrayList<>(); fateStores.forEach((type, store) -> { @@ -376,15 +365,13 @@ public class AdminUtil<T> { String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - List<String> hlocks = heldLocks.remove(fateId.getTid()); + List<String> hlocks = heldLocks.remove(fateId); if (hlocks == null) { hlocks = Collections.emptyList(); } - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - List<String> wlocks = waitingLocks.remove(fateId.getTid()); + List<String> wlocks = waitingLocks.remove(fateId); if (wlocks == null) { wlocks = Collections.emptyList(); @@ -400,10 +387,10 @@ public class AdminUtil<T> { long timeCreated = txStore.timeCreated(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (includeByStatus(status, filterStatus) && includeByTxid(fateId.getTid(), filterTxid)) { - statuses.add(new TransactionStatus(fateId.getTid(), type, status, txName, hlocks, - wlocks, top, timeCreated)); + if (includeByStatus(status, statusFilter) && includeByFateId(fateId, fateIdFilter) + && includeByInstanceType(fateId.getType(), typesFilter)) { + statuses.add(new TransactionStatus(fateId, type, status, txName, hlocks, wlocks, top, + timeCreated)); } }); } @@ -411,28 +398,35 @@ public class AdminUtil<T> { return new FateStatus(statuses, heldLocks, waitingLocks); } - private boolean includeByStatus(TStatus status, EnumSet<TStatus> filterStatus) { - return (filterStatus == null) || filterStatus.contains(status); + private boolean includeByStatus(TStatus status, EnumSet<TStatus> statusFilter) { + return statusFilter == null || statusFilter.isEmpty() || statusFilter.contains(status); + } + + private boolean includeByFateId(FateId fateId, Set<FateId> fateIdFilter) { + return fateIdFilter == null || fateIdFilter.isEmpty() || fateIdFilter.contains(fateId); } - private boolean includeByTxid(Long tid, Set<Long> filterTxid) { - return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid); + private boolean includeByInstanceType(FateInstanceType type, + EnumSet<FateInstanceType> typesFilter) { + return typesFilter == null || typesFilter.isEmpty() || typesFilter.contains(type); } public void printAll(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { - print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null); + print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); } public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk, - ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<Long> filterTxid, - EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException { - FateStatus fateStatus = getStatus(fateStores, zk, tableLocksPath, filterTxid, filterStatus); + ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<FateId> fateIdFilter, + EnumSet<TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter) + throws KeeperException, InterruptedException { + FateStatus fateStatus = + getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); for (TransactionStatus txStatus : fateStatus.getTransactions()) { fmt.format( - "%-15s txid: %s status: %-18s locked: %-15s locking: %-15s op: %-15s created: %s%n", - txStatus.getTxName(), txStatus.getTxid(), txStatus.getStatus(), txStatus.getHeldLocks(), + "%-15s fateId: %s status: %-18s locked: %-15s locking: %-15s op: %-15s created: %s%n", + txStatus.getTxName(), txStatus.getFateId(), txStatus.getStatus(), txStatus.getHeldLocks(), txStatus.getWaitingLocks(), txStatus.getTop(), txStatus.getTimeCreatedFormatted()); } fmt.format(" %s transactions", fateStatus.getTransactions().size()); @@ -440,38 +434,40 @@ public class AdminUtil<T> { if (!fateStatus.getDanglingHeldLocks().isEmpty() || !fateStatus.getDanglingWaitingLocks().isEmpty()) { fmt.format("%nThe following locks did not have an associated FATE operation%n"); - for (Entry<String,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) { - fmt.format("txid: %s locked: %s%n", entry.getKey(), entry.getValue()); + for (Entry<FateId,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet()) { + fmt.format("fateId: %s locked: %s%n", entry.getKey(), entry.getValue()); } - for (Entry<String,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet()) { - fmt.format("txid: %s locking: %s%n", entry.getKey(), entry.getValue()); + for (Entry<FateId,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet()) { + fmt.format("fateId: %s locking: %s%n", entry.getKey(), entry.getValue()); } } } - public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath path, - String txidStr) { + public boolean prepDelete(Map<FateInstanceType,FateStore<T>> stores, ZooReaderWriter zk, + ServiceLockPath path, String fateIdStr) { if (!checkGlobalLock(zk, path)) { return false; } - long txid; + FateId fateId; try { - txid = Long.parseLong(txidStr, 16); - } catch (NumberFormatException nfe) { - System.out.printf("Invalid transaction ID format: %s%n", txidStr); + fateId = FateId.from(fateIdStr); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); return false; } boolean state = false; - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - FateId fateId = FateId.from(FateInstanceType.META, txid); - FateTxStore<T> txStore = zs.reserve(fateId); + + // determine which store to use + FateStore<T> store = stores.get(fateId.getType()); + + FateTxStore<T> txStore = store.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); + System.out.println("Invalid transaction ID: " + fateId); break; case SUBMITTED: @@ -480,7 +476,7 @@ public class AdminUtil<T> { case FAILED: case FAILED_IN_PROGRESS: case SUCCESSFUL: - System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); + System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); txStore.delete(); state = true; break; @@ -491,45 +487,47 @@ public class AdminUtil<T> { return state; } - public boolean prepFail(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath, - String txidStr) { + public boolean prepFail(Map<FateInstanceType,FateStore<T>> stores, ZooReaderWriter zk, + ServiceLockPath zLockManagerPath, String fateIdStr) { if (!checkGlobalLock(zk, zLockManagerPath)) { return false; } - long txid; + FateId fateId; try { - txid = Long.parseLong(txidStr, 16); - } catch (NumberFormatException nfe) { - System.out.printf("Invalid transaction ID format: %s%n", txidStr); + fateId = FateId.from(fateIdStr); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); return false; } boolean state = false; - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - FateId fateId = FateId.from(FateInstanceType.META, txid); - FateTxStore<T> txStore = zs.reserve(fateId); + + // determine which store to use + FateStore<T> store = stores.get(fateId.getType()); + + FateTxStore<T> txStore = store.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); + System.out.println("Invalid fate ID: " + fateId); break; case SUBMITTED: case IN_PROGRESS: case NEW: - System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); + System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); txStore.setStatus(TStatus.FAILED_IN_PROGRESS); state = true; break; case SUCCESSFUL: - System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); + System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); break; case FAILED: case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); + System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); state = true; break; } @@ -540,7 +538,7 @@ public class AdminUtil<T> { return state; } - public void deleteLocks(ZooReaderWriter zk, ServiceLock.ServiceLockPath path, String txidStr) + public void deleteLocks(ZooReaderWriter zk, ServiceLock.ServiceLockPath path, String fateIdStr) throws KeeperException, InterruptedException { // delete any locks assoc w/ fate operation List<String> lockedIds = zk.getChildren(path.toString()); @@ -550,8 +548,9 @@ public class AdminUtil<T> { for (String node : lockNodes) { String lockPath = path + "/" + id + "/" + node; byte[] data = zk.getData(path + "/" + id + "/" + node); - String[] lda = new String(data, UTF_8).split(":"); - if (lda[1].equals(txidStr)) { + // Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2 + String[] lda = new String(data, UTF_8).split(":", 2); + if (lda[1].equals(fateIdStr)) { zk.recursiveDelete(lockPath, NodeMissingPolicy.SKIP); } } @@ -563,7 +562,7 @@ public class AdminUtil<T> { + "this code is used by the fate admin shell command") public boolean checkGlobalLock(ZooReaderWriter zk, ServiceLockPath zLockManagerPath) { try { - if (ServiceLock.getLockData(zk.getZooKeeper(), zLockManagerPath) != null) { + if (ServiceLock.getLockData(zk.getZooKeeper(), zLockManagerPath).isPresent()) { System.err.println("ERROR: Manager lock is held, not running"); if (this.exitOnError) { System.exit(1); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java index 8468eda355..ad0d5740af 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java @@ -59,12 +59,4 @@ public class FateTxId { return FastFormat.toHexString(PREFIX, tid, SUFFIX); } - public static long parseTidFromUserInput(String s) { - if (isFormatedTid(s)) { - return fromString(s); - } else { - return Long.parseLong(s, 16); - } - } - } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 63c91c50c1..9cbd0c8e19 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,7 +20,6 @@ package org.apache.accumulo.server.util; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.core.fate.FateTxId.parseTidFromUserInput; import java.io.BufferedWriter; import java.io.File; @@ -40,6 +39,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; @@ -54,8 +54,9 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; @@ -64,7 +65,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.TFateId; -import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -214,34 +214,40 @@ public class Admin implements KeywordExecutable { @Parameters(commandNames = "fate", commandDescription = "Operations performed on the Manager FaTE system.") static class FateOpsCommand { - @Parameter(description = "[<txId>...]") - List<String> txList = new ArrayList<>(); + @Parameter(description = "[<FateId>...]") + List<String> fateIdList = new ArrayList<>(); @Parameter(names = {"-c", "--cancel"}, - description = "<txId>... Cancel new or submitted FaTE transactions") + description = "<FateId>... Cancel new or submitted FaTE transactions") boolean cancel; @Parameter(names = {"-f", "--fail"}, - description = "<txId>... Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)") + description = "<FateId>... Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)") boolean fail; @Parameter(names = {"-d", "--delete"}, - description = "<txId>... Delete locks associated with transactions (Requires Manager to be down)") + description = "<FateId>... Delete locks associated with transactions (Requires Manager to be down)") boolean delete; @Parameter(names = {"-p", "--print", "-print", "-l", "--list", "-list"}, - description = "[<txId>...] Print information about FaTE transactions. Print only the 'txId's specified or print all transactions if empty. Use -s to only print certain states.") + description = "[<FateId>...] Print information about FaTE transactions. Print only the FateId's specified or print all transactions if empty. Use -s to only print those with certain states. Use -t to only print those with certain FateInstanceTypes.") boolean print; - @Parameter(names = "--summary", description = "Print a summary of all FaTE transactions") + @Parameter(names = "--summary", + description = "[<FateId>...] Print a summary of FaTE transactions. Print only the FateId's specified or print all transactions if empty. Use -s to only print those with certain states. Use -t to only print those with certain FateInstanceTypes. Use -j to print the transactions in json.") boolean summarize; - @Parameter(names = {"-j", "--json"}, description = "Print transactions in json") + @Parameter(names = {"-j", "--json"}, + description = "Print transactions in json. Only useful for --summary command.") boolean printJson; @Parameter(names = {"-s", "--state"}, description = "<state>... Print transactions in the state(s) {NEW, IN_PROGRESS, FAILED_IN_PROGRESS, FAILED, SUCCESSFUL}") List<String> states = new ArrayList<>(); + + @Parameter(names = {"-t", "--type"}, + description = "<type>... Print transactions of fate instance type(s) {USER, META}") + List<String> instanceTypes = new ArrayList<>(); } public static void main(String[] args) { @@ -767,39 +773,43 @@ public class Admin implements KeywordExecutable { ZooReaderWriter zk = context.getZooReaderWriter(); ZooStore<Admin> zs = new ZooStore<>(fateZkPath, zk); AccumuloStore<Admin> as = new AccumuloStore<>(context); - Map<FateInstanceType,ReadOnlyFateStore<Admin>> fateStores = + Map<FateInstanceType,FateStore<Admin>> fateStores = + Map.of(FateInstanceType.META, zs, FateInstanceType.USER, as); + Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = Map.of(FateInstanceType.META, zs, FateInstanceType.USER, as); if (fateOpsCommand.cancel) { - cancelSubmittedFateTxs(context, fateOpsCommand.txList); + cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); } else if (fateOpsCommand.fail) { - for (String txid : fateOpsCommand.txList) { - if (!admin.prepFail(zs, zk, zLockManagerPath, txid)) { - throw new AccumuloException("Could not fail transaction: " + txid); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) { + throw new AccumuloException("Could not fail transaction: " + fateIdStr); } } } else if (fateOpsCommand.delete) { - for (String txid : fateOpsCommand.txList) { - if (!admin.prepDelete(zs, zk, zLockManagerPath, txid)) { - throw new AccumuloException("Could not delete transaction: " + txid); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) { + throw new AccumuloException("Could not delete transaction: " + fateIdStr); } - admin.deleteLocks(zk, zTableLocksPath, txid); + admin.deleteLocks(zk, zTableLocksPath, fateIdStr); } } if (fateOpsCommand.print) { - final Set<Long> sortedTxs = new TreeSet<>(); - fateOpsCommand.txList.forEach(s -> sortedTxs.add(parseTidFromUserInput(s))); + final Set<FateId> fateIdFilter = new TreeSet<>(); + fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); EnumSet<ReadOnlyFateStore.TStatus> statusFilter = getCmdLineStatusFilters(fateOpsCommand.states); - admin.print(fateStores, zk, zTableLocksPath, new Formatter(System.out), sortedTxs, - statusFilter); + EnumSet<FateInstanceType> typesFilter = + getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); + admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), fateIdFilter, + statusFilter, typesFilter); // print line break at the end System.out.println(); } if (fateOpsCommand.summarize) { - summarizeFateTx(context, fateOpsCommand, admin, fateStores, zTableLocksPath); + summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); } } @@ -808,35 +818,33 @@ public class Admin implements KeywordExecutable { throw new IllegalArgumentException( "Can only perform one of the following at a time: cancel, fail or delete."); } - if ((cmd.cancel || cmd.fail || cmd.delete) && cmd.txList.isEmpty()) { + if ((cmd.cancel || cmd.fail || cmd.delete) && cmd.fateIdList.isEmpty()) { throw new IllegalArgumentException( "At least one txId required when using cancel, fail or delete"); } } - private void cancelSubmittedFateTxs(ServerContext context, List<String> txList) + private void cancelSubmittedFateTxs(ServerContext context, List<String> fateIdList) throws AccumuloException { - for (String txStr : txList) { - // TODO: We need to pass and then parse the instance type to create TFateId, - // maybe something like <type>:txid - long txid = Long.parseLong(txStr, 16); - boolean cancelled = cancelFateOperation(context, new TFateId(TFateInstanceType.META, txid)); + for (String fateIdStr : fateIdList) { + FateId fateId = FateId.from(fateIdStr); + TFateId thriftFateId = fateId.toThrift(); + boolean cancelled = cancelFateOperation(context, thriftFateId); if (cancelled) { - System.out.println("FaTE transaction " + FateTxId.formatTid(txid) - + " was cancelled or already completed."); + System.out.println("FaTE transaction " + fateId + " was cancelled or already completed."); } else { - System.out.println("FaTE transaction " + FateTxId.formatTid(txid) - + " was not cancelled, status may have changed."); + System.out + .println("FaTE transaction " + fateId + " was not cancelled, status may have changed."); } } } - private boolean cancelFateOperation(ClientContext context, TFateId txid) + private boolean cancelFateOperation(ClientContext context, TFateId thriftFateId) throws AccumuloException { FateService.Client client = null; try { client = ThriftClientTypes.FATE.getConnectionWithRetry(context); - return client.cancelFateOperation(TraceUtil.traceInfo(), context.rpcCreds(), txid); + return client.cancelFateOperation(TraceUtil.traceInfo(), context.rpcCreds(), thriftFateId); } catch (Exception e) { throw new AccumuloException(e); } finally { @@ -852,7 +860,7 @@ public class Admin implements KeywordExecutable { throws InterruptedException, AccumuloException, AccumuloSecurityException, KeeperException { ZooReaderWriter zk = context.getZooReaderWriter(); - var transactions = admin.getStatus(fateStores, zk, tableLocksPath, null, null); + var transactions = admin.getStatus(fateStores, zk, tableLocksPath, null, null, null); // build id map - relies on unique ids for tables and namespaces // used to look up the names of either table or namespace by id. @@ -867,9 +875,13 @@ public class Admin implements KeywordExecutable { } }); + Set<FateId> fateIdFilter = + cmd.fateIdList.stream().map(FateId::from).collect(Collectors.toSet()); EnumSet<ReadOnlyFateStore.TStatus> statusFilter = getCmdLineStatusFilters(cmd.states); + EnumSet<FateInstanceType> typesFilter = getCmdLineInstanceTypeFilters(cmd.instanceTypes); - FateSummaryReport report = new FateSummaryReport(idsToNameMap, statusFilter); + FateSummaryReport report = + new FateSummaryReport(idsToNameMap, fateIdFilter, statusFilter, typesFilter); // gather statistics transactions.getTransactions().forEach(report::gatherTxnStatus); @@ -892,7 +904,7 @@ public class Admin implements KeywordExecutable { /** * If provided on the command line, get the TStatus values provided. * - * @return a set of status filters, or an empty set if none provides + * @return a set of status filters, or null if none provided */ private EnumSet<ReadOnlyFateStore.TStatus> getCmdLineStatusFilters(List<String> states) { EnumSet<ReadOnlyFateStore.TStatus> statusFilter = null; @@ -904,4 +916,20 @@ public class Admin implements KeywordExecutable { } return statusFilter; } + + /** + * If provided on the command line, get the FateInstanceType values provided. + * + * @return a set of fate instance types filters, or null if none provided + */ + private EnumSet<FateInstanceType> getCmdLineInstanceTypeFilters(List<String> instanceTypes) { + EnumSet<FateInstanceType> typesFilter = null; + if (!instanceTypes.isEmpty()) { + typesFilter = EnumSet.noneOf(FateInstanceType.class); + for (String instanceType : instanceTypes) { + typesFilter.add(FateInstanceType.valueOf(instanceType)); + } + } + return typesFilter; + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java index f99e36d704..2dfdc20c72 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java @@ -33,6 +33,8 @@ import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import com.google.gson.Gson; @@ -47,19 +49,27 @@ public class FateSummaryReport { // epoch millis to avoid needing gson type adapter. private final long reportTime = Instant.now().toEpochMilli(); + private final Set<String> fateIdFilter = new TreeSet<>(); private final Set<String> statusFilterNames = new TreeSet<>(); + private final Set<String> instanceTypesFilterNames = new TreeSet<>(); private final static Gson gson = new GsonBuilder().setPrettyPrinting().create(); // exclude from json output private final transient Map<String,String> idsToNameMap; - public FateSummaryReport(Map<String,String> idsToNameMap, - EnumSet<ReadOnlyFateStore.TStatus> statusFilter) { + public FateSummaryReport(Map<String,String> idsToNameMap, Set<FateId> fateIdFilter, + EnumSet<ReadOnlyFateStore.TStatus> statusFilter, EnumSet<FateInstanceType> typesFilter) { this.idsToNameMap = idsToNameMap; + if (fateIdFilter != null) { + fateIdFilter.forEach(f -> this.fateIdFilter.add(f.canonical())); + } if (statusFilter != null) { statusFilter.forEach(f -> this.statusFilterNames.add(f.name())); } + if (typesFilter != null) { + typesFilter.forEach(f -> this.instanceTypesFilterNames.add(f.name())); + } } public void gatherTxnStatus(AdminUtil.TransactionStatus txnStatus) { @@ -75,10 +85,19 @@ public class FateSummaryReport { String runningRepo = txnStatus.getTxName(); cmdCounts.merge(Objects.requireNonNullElse(runningRepo, "?"), 1, Integer::sum); + // filter transactions if provided + if (!fateIdFilter.isEmpty() && !fateIdFilter.contains(txnStatus.getFateId().canonical())) { + return; + } // filter status if provided. if (!statusFilterNames.isEmpty() && !statusFilterNames.contains(txnStatus.getStatus().name())) { return; } + // filter FateInstanceType if provided + if (!instanceTypesFilterNames.isEmpty() + && !instanceTypesFilterNames.contains(txnStatus.getInstanceType().name())) { + return; + } fateDetails.add(new FateTxnDetails(reportTime, txnStatus, idsToNameMap)); } @@ -102,10 +121,18 @@ public class FateSummaryReport { return reportTime; } + public Set<String> getFateIdFilter() { + return fateIdFilter; + } + public Set<String> getStatusFilterNames() { return statusFilterNames; } + public Set<String> getInstanceTypesFilterNames() { + return instanceTypesFilterNames; + } + public String toJson() { return gson.toJson(this); } @@ -142,6 +169,9 @@ public class FateSummaryReport { lines.add("\nFate transactions (oldest first):"); lines.add("Status Filters: " + (statusFilterNames.isEmpty() ? "[NONE]" : statusFilterNames.toString())); + lines.add("Fate ID Filters: " + (fateIdFilter.isEmpty() ? "[NONE]" : fateIdFilter.toString())); + lines.add("Instance Types Filters: " + + (instanceTypesFilterNames.isEmpty() ? "[NONE]" : instanceTypesFilterNames.toString())); lines.add(FateTxnDetails.TXN_HEADER); fateDetails.forEach(txnDetails -> lines.add(txnDetails.toString())); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java index ae7798af7a..66c12f81cd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java @@ -28,13 +28,13 @@ import org.apache.accumulo.core.fate.AdminUtil; public class FateTxnDetails implements Comparable<FateTxnDetails> { final static String TXN_HEADER = - "Running\ttxn_id\t\t\t\tStatus\t\tCommand\t\tStep (top)\t\tlocks held:(table id, name)\tlocks waiting:(table id, name)"; + "Running\tfate_id\t\t\t\tStatus\t\tCommand\t\tStep (top)\t\tlocks held:(table id, name)\tlocks waiting:(table id, name)"; private long running; private String status = "?"; private String txName = "?"; private String step = "?"; - private String txnId = "?"; + private String fateId = "?"; private List<String> locksHeld = List.of(); private List<String> locksWaiting = List.of(); @@ -71,8 +71,8 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { if (txnStatus.getTxName() != null) { txName = txnStatus.getTxName(); } - if (txnStatus.getTxid() != null) { - txnId = txnStatus.getTxid(); + if (txnStatus.getFateId() != null) { + fateId = txnStatus.getFateId().canonical(); } locksHeld = formatLockInfo(txnStatus.getHeldLocks(), idsToNameMap); locksWaiting = formatLockInfo(txnStatus.getWaitingLocks(), idsToNameMap); @@ -92,8 +92,12 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { return formattedLocks; } - public String getTxnId() { - return txnId; + public String getFateId() { + return fateId; + } + + public String getStatus() { + return status; } /** @@ -109,7 +113,7 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { if (v != 0) { return v; } - return txnId.compareTo(other.txnId); + return fateId.compareTo(other.fateId); } @Override @@ -122,12 +126,12 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { return false; } FateTxnDetails that = (FateTxnDetails) o; - return running == that.running && txnId.equals(that.txnId); + return running == that.running && fateId.equals(that.fateId); } @Override public int hashCode() { - return Objects.hash(running, txnId); + return Objects.hash(running, fateId); } @Override @@ -136,7 +140,7 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> { String hms = String.format("%d:%02d:%02d", elapsed.toHours(), elapsed.toMinutesPart(), elapsed.toSecondsPart()); - return hms + "\t" + txnId + "\t" + status + "\t" + txName + "\t" + step + "\theld:" + return hms + "\t" + fateId + "\t" + status + "\t" + txName + "\t" + step + "\theld:" + locksHeld.toString() + "\twaiting:" + locksWaiting.toString(); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java index bed17e92c4..4618715fbb 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -44,7 +45,7 @@ class SummaryReportTest { @Test public void blankReport() { Map<String,String> idMap = Map.of("1", "ns1", "2", "tbl1"); - FateSummaryReport report = new FateSummaryReport(idMap, null); + FateSummaryReport report = new FateSummaryReport(idMap, null, null, null); assertNotNull(report); assertNotEquals(0, report.getReportTime()); @@ -53,6 +54,8 @@ class SummaryReportTest { assertEquals(Map.of(), report.getStepCounts()); assertEquals(Set.of(), report.getFateDetails()); assertEquals(Set.of(), report.getStatusFilterNames()); + assertEquals(Set.of(), report.getInstanceTypesFilterNames()); + assertEquals(Set.of(), report.getFateIdFilter()); assertNotNull(report.toJson()); assertNotNull(report.formatLines()); @@ -70,13 +73,13 @@ class SummaryReportTest { expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn(null).anyTimes(); expect(status1.getTxName()).andReturn(null).anyTimes(); - expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); + expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:abcdabcd")).anyTimes(); expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes(); expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes(); replay(status1); Map<String,String> idMap = Map.of("1", "ns1", "2", ""); - FateSummaryReport report = new FateSummaryReport(idMap, null); + FateSummaryReport report = new FateSummaryReport(idMap, null, null, null); report.gatherTxnStatus(status1); assertNotNull(report); @@ -85,6 +88,8 @@ class SummaryReportTest { assertEquals(Map.of("?", 1), report.getCmdCounts()); assertEquals(Map.of("?", 1), report.getStepCounts()); assertEquals(Set.of(), report.getStatusFilterNames()); + assertEquals(Set.of(), report.getInstanceTypesFilterNames()); + assertEquals(Set.of(), report.getFateIdFilter()); assertNotNull(report.toJson()); assertNotNull(report.formatLines()); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java index 48e5c59845..35be83fce8 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java @@ -33,6 +33,7 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AdminUtil; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -53,7 +54,7 @@ class TxnDetailsTest { expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); expect(status1.getTxName()).andReturn("runningTx1").anyTimes(); - expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); + expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:abcdabcd")).anyTimes(); expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes(); expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes(); @@ -62,7 +63,7 @@ class TxnDetailsTest { expect(status2.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status2.getTop()).andReturn("step2").anyTimes(); expect(status2.getTxName()).andReturn("runningTx2").anyTimes(); - expect(status2.getTxid()).andReturn("123456789").anyTimes(); + expect(status2.getFateId()).andReturn(FateId.from("FATE:USER:123456789")).anyTimes(); expect(status2.getHeldLocks()).andReturn(List.of()).anyTimes(); expect(status2.getWaitingLocks()).andReturn(List.of()).anyTimes(); @@ -96,7 +97,7 @@ class TxnDetailsTest { expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); expect(status1.getTxName()).andReturn("runningTx").anyTimes(); - expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); + expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:abcdabcd")).anyTimes(); // incomplete lock info (W unknown ns id, no table)) expect(status1.getHeldLocks()).andReturn(List.of("R:1", "R:2", "W:a")).anyTimes(); // blank names diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 4f99d01586..841cf61dd1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -111,7 +111,7 @@ class FateMetricValues { try { List<AdminUtil.TransactionStatus> currFates = - admin.getTransactionStatus(Map.of(FateInstanceType.META, zooStore), null, null); + admin.getTransactionStatus(Map.of(FateInstanceType.META, zooStore), null, null, null); builder.withCurrentFateOps(currFates.size()); diff --git a/test/src/main/java/org/apache/accumulo/test/FateSummaryIT.java b/test/src/main/java/org/apache/accumulo/test/FateSummaryIT.java deleted file mode 100644 index d868e5e8bb..0000000000 --- a/test/src/main/java/org/apache/accumulo/test/FateSummaryIT.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.admin.NewTableConfiguration; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.util.Admin; -import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.accumulo.test.functional.ReadWriteIT; -import org.apache.accumulo.test.functional.SlowIterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -public class FateSummaryIT extends ConfigurableMacBase { - - @Override - protected Duration defaultTimeout() { - return Duration.ofMinutes(2); - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {} - - @Test - public void testFateSummaryCommandWithSlowCompaction() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - String namespace = "ns1"; - final String table = namespace + "." + getUniqueNames(1)[0]; - client.namespaceOperations().create(namespace); - - SortedSet<Text> splits = new TreeSet<Text>(); - splits.add(new Text("h")); - splits.add(new Text("m")); - splits.add(new Text("r")); - splits.add(new Text("w")); - IteratorSetting is = new IteratorSetting(1, SlowIterator.class); - is.addOption("sleepTime", "10000"); - - NewTableConfiguration cfg = new NewTableConfiguration(); - cfg.withSplits(splits); - cfg.attachIterator(is, EnumSet.of(IteratorScope.majc)); - client.tableOperations().create(table, cfg); - - ReadWriteIT.ingest(client, 10, 10, 10, 0, table); - client.tableOperations().flush(table); - - // validate blank report, compactions have not started yet - ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW", "-s", - "IN_PROGRESS", "-s", "FAILED"); - assertEquals(0, p.getProcess().waitFor()); - String result = p.readStdOut(); - result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); - FateSummaryReport report = FateSummaryReport.fromJson(result); - assertNotNull(report); - assertNotEquals(0, report.getReportTime()); - Set<String> expected = new HashSet<>(); - expected.add("FAILED"); - expected.add("IN_PROGRESS"); - expected.add("NEW"); - assertEquals(expected, report.getStatusFilterNames()); - assertEquals(Map.of(), report.getStatusCounts()); - assertEquals(Map.of(), report.getStepCounts()); - assertEquals(Map.of(), report.getCmdCounts()); - - // create Fate transactions - client.tableOperations().compact(table, null, null, false, false); - client.tableOperations().compact(table, null, null, false, false); - - // validate no filters - p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); - assertEquals(0, p.getProcess().waitFor()); - result = p.readStdOut(); - result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); - report = FateSummaryReport.fromJson(result); - assertNotNull(report); - assertNotEquals(0, report.getReportTime()); - assertEquals(Set.of(), report.getStatusFilterNames()); - assertFalse(report.getStatusCounts().isEmpty()); - assertFalse(report.getStepCounts().isEmpty()); - assertFalse(report.getCmdCounts().isEmpty()); - assertEquals(2, report.getFateDetails().size()); - ArrayList<String> txns = new ArrayList<>(); - report.getFateDetails().forEach((d) -> { - txns.add(d.getTxnId()); - }); - assertEquals(2, txns.size()); - - // validate tx ids - p = getCluster().exec(Admin.class, "fate", txns.get(0), txns.get(1), "--summary", "-j"); - assertEquals(0, p.getProcess().waitFor()); - result = p.readStdOut(); - result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); - report = FateSummaryReport.fromJson(result); - assertNotNull(report); - assertNotEquals(0, report.getReportTime()); - assertEquals(Set.of(), report.getStatusFilterNames()); - assertFalse(report.getStatusCounts().isEmpty()); - assertFalse(report.getStepCounts().isEmpty()); - assertFalse(report.getCmdCounts().isEmpty()); - assertEquals(2, report.getFateDetails().size()); - - // validate filter by including only FAILED transactions, should be none - p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "FAILED"); - assertEquals(0, p.getProcess().waitFor()); - result = p.readStdOut(); - result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); - report = FateSummaryReport.fromJson(result); - assertNotNull(report); - assertNotEquals(0, report.getReportTime()); - assertEquals(Set.of("FAILED"), report.getStatusFilterNames()); - assertFalse(report.getStatusCounts().isEmpty()); - assertFalse(report.getStepCounts().isEmpty()); - assertFalse(report.getCmdCounts().isEmpty()); - assertEquals(0, report.getFateDetails().size()); - - } - } - -} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java new file mode 100644 index 0000000000..b716d12d5f --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +public abstract class FateOpsCommandsIT extends ConfigurableMacBase + implements FateTestRunner<FateTestRunner.TestEnv> { + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + // Used for tests that shutdown the manager so the sleep time after shutdown isn't too long + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); + } + + @Test + public void testFateSummaryCommand() throws Exception { + executeTest(this::testFateSummaryCommand); + } + + protected void testFateSummaryCommand(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // validate blank report, no transactions have started + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "NEW", "-s", + "IN_PROGRESS", "-s", "FAILED"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + FateSummaryReport report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertTrue(report.getStatusCounts().isEmpty()); + assertTrue(report.getStepCounts().isEmpty()); + assertTrue(report.getCmdCounts().isEmpty()); + assertEquals(Set.of("FAILED", "IN_PROGRESS", "NEW"), report.getStatusFilterNames()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + assertEquals(0, report.getFateDetails().size()); + + // create Fate transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + List<String> fateIdsStarted = List.of(fateId1.canonical(), fateId2.canonical()); + + // validate no filters + p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + assertEquals(2, report.getFateDetails().size()); + ArrayList<String> fateIdsFromResult1 = new ArrayList<>(); + report.getFateDetails().forEach((d) -> { + fateIdsFromResult1.add(d.getFateId()); + }); + assertTrue(fateIdsFromResult1.containsAll(fateIdsStarted)); + + // validate filtering by both transactions + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), + "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(2, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().containsAll(fateIdsStarted)); + assertEquals(2, report.getFateDetails().size()); + ArrayList<String> fateIdsFromResult2 = new ArrayList<>(); + report.getFateDetails().forEach((d) -> { + fateIdsFromResult2.add(d.getFateId()); + }); + assertTrue(fateIdsFromResult2.containsAll(fateIdsStarted)); + + // validate filtering by just one transaction + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertEquals(1, report.getFateIdFilter().size()); + assertTrue(report.getFateIdFilter().contains(fateId1.canonical())); + assertEquals(1, report.getFateDetails().size()); + ArrayList<String> fateIdsFromResult3 = new ArrayList<>(); + report.getFateDetails().forEach((d) -> { + fateIdsFromResult3.add(d.getFateId()); + }); + assertTrue(fateIdsFromResult3.contains(fateId1.canonical())); + + // validate status filter by including only FAILED transactions, should be none + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-s", "FAILED"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertEquals(Set.of("FAILED"), report.getStatusFilterNames()); + assertTrue(report.getInstanceTypesFilterNames().isEmpty()); + assertTrue(report.getFateIdFilter().isEmpty()); + assertEquals(0, report.getFateDetails().size()); + + // validate FateInstanceType filter by only including transactions with META filter + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", "META"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertEquals(Set.of("META"), report.getInstanceTypesFilterNames()); + assertTrue(report.getFateIdFilter().isEmpty()); + if (store.type() == FateInstanceType.META) { + assertEquals(2, report.getFateDetails().size()); + ArrayList<String> fateIdsFromResult4 = new ArrayList<>(); + report.getFateDetails().forEach((d) -> { + fateIdsFromResult4.add(d.getFateId()); + }); + assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted)); + } else { // USER + assertEquals(0, report.getFateDetails().size()); + } + + // validate FateInstanceType filter by only including transactions with USER filter + p = getCluster().exec(Admin.class, "fate", "--summary", "-j", "-t", "USER"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + report = FateSummaryReport.fromJson(result); + assertNotNull(report); + assertNotEquals(0, report.getReportTime()); + assertFalse(report.getStatusCounts().isEmpty()); + assertFalse(report.getStepCounts().isEmpty()); + assertFalse(report.getCmdCounts().isEmpty()); + assertTrue(report.getStatusFilterNames().isEmpty()); + assertEquals(Set.of("USER"), report.getInstanceTypesFilterNames()); + assertTrue(report.getFateIdFilter().isEmpty()); + if (store.type() == FateInstanceType.META) { + assertEquals(0, report.getFateDetails().size()); + } else { // USER + assertEquals(2, report.getFateDetails().size()); + ArrayList<String> fateIdsFromResult4 = new ArrayList<>(); + report.getFateDetails().forEach((d) -> { + fateIdsFromResult4.add(d.getFateId()); + }); + assertTrue(fateIdsFromResult4.containsAll(fateIdsStarted)); + } + + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateSummaryCommandPlainText() throws Exception { + executeTest(this::testFateSummaryCommandPlainText); + } + + protected void testFateSummaryCommandPlainText(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), + "--summary", "-s", "NEW", "-t", store.type().name()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + assertTrue(result.contains("Status Filters: [NEW]")); + assertTrue(result + .contains("Fate ID Filters: [" + fateId1.canonical() + ", " + fateId2.canonical() + "]") + || result.contains( + "Fate ID Filters: [" + fateId2.canonical() + ", " + fateId1.canonical() + "]")); + assertTrue(result.contains("Instance Types Filters: [" + store.type().name() + "]")); + } + + @Test + public void testFatePrintCommand() throws Exception { + executeTest(this::testFatePrintCommand); + } + + protected void testFatePrintCommand(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // validate no transactions + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + assertTrue(result.contains("0 transactions")); + + // create Fate transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Get all transactions. Should be 2 FateIds with a NEW status + p = getCluster().exec(Admin.class, "fate", "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + Map<String,String> fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + /* + * Test filtering by States + */ + + // Filter by NEW state + p = getCluster().exec(Admin.class, "fate", "--print", "-s", "NEW"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + // Filter by FAILED state + p = getCluster().exec(Admin.class, "fate", "--print", "-s", "FAILED"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertTrue(fateIdsFromResult.isEmpty()); + + /* + * Test filtering by FateIds + */ + + // Filter by one FateId + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW"), fateIdsFromResult); + + // Filter by both FateIds + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), fateId2.canonical(), "--print"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromResult); + + /* + * Test filtering by FateInstanceType + */ + + // Test filter by USER FateInstanceType + p = getCluster().exec(Admin.class, "fate", "--print", "-t", "USER"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + if (store.type() == FateInstanceType.META) { + assertTrue(fateIdsFromResult.isEmpty()); + } else { // USER + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromResult); + } + + // Test filter by META FateInstanceType + p = getCluster().exec(Admin.class, "fate", "--print", "-t", "META"); + assertEquals(0, p.getProcess().waitFor()); + result = p.readStdOut(); + fateIdsFromResult = getFateIdsFromPrint(result); + if (store.type() == FateInstanceType.META) { + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromResult); + } else { // USER + assertTrue(fateIdsFromResult.isEmpty()); + } + + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateCancelCommand() throws Exception { + executeTest(this::testFateCancelCommand); + } + + protected void testFateCancelCommand(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Cancel the first transaction and ensure that it was cancelled + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--cancel"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result + .contains("transaction " + fateId1.canonical() + " was cancelled or already completed")); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommand() throws Exception { + executeTest(this::testFateFailCommand); + } + + protected void testFateFailCommand(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Attempt to --fail the transaction. Should not work as the Manager is still up + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + assertEquals(1, p.getProcess().waitFor()); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Stop MANAGER so --fail can be called + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + Thread.sleep(20_000); + + // Fail the first transaction and ensure that it was failed + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Failing transaction: " + fateId1)); + fateIdsFromSummary = getFateIdsFromSummary(); + assertTrue(fateIdsFromSummary + .equals(Map.of(fateId1.canonical(), "FAILED_IN_PROGRESS", fateId2.canonical(), "NEW")) + || fateIdsFromSummary + .equals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"))); + + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommand() throws Exception { + executeTest(this::testFateDeleteCommand); + } + + protected void testFateDeleteCommand(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate<TestEnv> fate = initializeFate(store); + // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was + // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes + // this issue. + getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); + Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map<String,String> fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Attempt to --delete the transaction. Should not work as the Manager is still up + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + assertEquals(1, p.getProcess().waitFor()); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Stop MANAGER so --delete can be called + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + Thread.sleep(20_000); + + // Delete the first transaction and ensure that it was deleted + p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Deleting transaction: " + fateId1)); + fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId2.canonical(), "NEW"), fateIdsFromSummary); + + fate.shutdown(10, TimeUnit.MINUTES); + } + + /** + * + * @param printResult the output of the --print fate command + * @return a map of each of the FateIds to their status using the output of --print + */ + private Map<String,String> getFateIdsFromPrint(String printResult) { + Map<String,String> fateIdToStatus = new HashMap<>(); + String lastFateIdSeen = null; + String[] words = printResult.split(" "); + for (String word : words) { + if (FateId.isFateId(word)) { + if (!fateIdToStatus.containsKey(word)) { + lastFateIdSeen = word; + } else { + log.debug( + "--print listed the same transaction more than once. This should not occur, failing"); + fail(); + } + } else if (wordIsTStatus(word)) { + fateIdToStatus.put(lastFateIdSeen, word); + } + } + return fateIdToStatus; + } + + /** + * + * @return a map of each of the FateIds to their status using the --summary command + */ + private Map<String,String> getFateIdsFromSummary() throws Exception { + ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + result = result.substring(result.indexOf("{"), result.lastIndexOf("}") + 1); + FateSummaryReport report = FateSummaryReport.fromJson(result); + assertNotNull(report); + Map<String,String> fateIdToStatus = new HashMap<>(); + report.getFateDetails().forEach((d) -> { + fateIdToStatus.put(d.getFateId(), d.getStatus()); + }); + return fateIdToStatus; + } + + private Fate<TestEnv> initializeFate(FateStore<TestEnv> store) { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); + return new Fate<>(new TestEnv(), store, Object::toString, config); + } + + private boolean wordIsTStatus(String word) { + try { + ReadOnlyFateStore.TStatus.valueOf(word); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateOpsCommandsIT.java new file mode 100644 index 0000000000..3ec774350b --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateOpsCommandsIT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.test.fate.FateOpsCommandsIT; + +public class AccumuloFateOpsCommandsIT extends FateOpsCommandsIT { + @Override + public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + testMethod.execute(new AccumuloStore<>(getCluster().getServerContext()), + getCluster().getServerContext()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateOpsCommandsIT.java new file mode 100644 index 0000000000..eed1bb244d --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateOpsCommandsIT.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.zookeeper; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.ZooStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateOpsCommandsIT; + +public class ZookeeperFateOpsCommandsIT extends FateOpsCommandsIT { + @Override + public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + ServerContext sctx = getCluster().getServerContext(); + String path = sctx.getZooKeeperRoot() + Constants.ZFATE; + ZooReaderWriter zk = sctx.getZooReaderWriter(); + testMethod.execute(new ZooStore<>(path, zk), sctx); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 325f905129..85c2e3cfb2 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -269,10 +269,10 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { Map<FateInstanceType,ReadOnlyFateStore<String>> fateStores = Map.of(FateInstanceType.META, zs, FateInstanceType.USER, as); - withLocks = admin.getStatus(fateStores, zk, lockPath, null, null); + withLocks = admin.getStatus(fateStores, zk, lockPath, null, null, null); // call method that does not use locks. - noLocks = admin.getTransactionStatus(fateStores, null, null); + noLocks = admin.getTransactionStatus(fateStores, null, null, null); // no zk exception, no need to retry break; @@ -304,10 +304,10 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { if (isCompaction(tx)) { - log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus()); + log.trace("Fate id: {}, status: {}", tx.getFateId(), tx.getStatus()); for (AdminUtil.TransactionStatus tx2 : noLocks) { - if (tx2.getTxid().equals(tx.getTxid())) { + if (tx2.getFateId().equals(tx.getFateId())) { matchCount++; } } @@ -358,7 +358,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk); var lockPath = ServiceLock.path(ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId); - AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, lockPath, null, null); + AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, lockPath, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); @@ -386,7 +386,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { log.trace("tid: {}", tableId); AccumuloStore<String> as = new AccumuloStore<>(context); - AdminUtil.FateStatus fateStatus = admin.getStatus(as, null, null); + AdminUtil.FateStatus fateStatus = admin.getStatus(as, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); @@ -418,7 +418,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { return false; } - log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus()); + log.trace("Fate id: {}, status: {}", tx.getFateId(), tx.getStatus()); String top = tx.getTop(); String txName = tx.getTxName(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 1dd8da3ebb..7d4caa1202 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -233,7 +233,7 @@ public class FunctionalTestUtils { ZooReaderWriter zk = context.getZooReaderWriter(); ZooStore<String> zs = new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk); var lockPath = ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS); - return admin.getStatus(zs, zk, lockPath, null, null); + return admin.getStatus(zs, zk, lockPath, null, null, null); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); }