DL-116: Add tool for deleting subscriber from subscription store Test Plan:
1. manually create znode for subscribers resume point 2. use the tool to delete the subscriberId Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9d467a60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9d467a60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9d467a60 Branch: refs/heads/master Commit: 9d467a60a8c2ad2c0acc73afbfb66c232baa0b46 Parents: 00be3e5 Author: Yiming Zang <yz...@twitter.com> Authored: Wed Nov 30 14:16:14 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:07:27 2016 -0800 ---------------------------------------------------------------------- .../subscription/SubscriptionsStore.java | 10 ++ .../subscription/ZKSubscriptionsStore.java | 16 ++- .../tools/DistributedLogTool.java | 119 ++++++++++++++++++- .../com/twitter/distributedlog/util/Utils.java | 38 ++++++ 4 files changed, 181 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9d467a60/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java index 9905cea..27d5c1d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java @@ -56,4 +56,14 @@ public interface SubscriptionsStore extends Closeable { */ public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition); + /** + * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the + * data stored under this subscriber will be lost. + * @param subscriberId subscriber id + * @return future represent success or failure. + * return true only if there's such subscriber and we removed it successfully. + * return false if there's no such subscriber, or we failed to remove. + */ + public Future<Boolean> deleteSubscriber(String subscriberId); + } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9d467a60/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java index fb154c1..f1e6251 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/ZKSubscriptionsStore.java @@ -20,9 +20,12 @@ package com.twitter.distributedlog.subscription; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.util.Utils; import com.twitter.util.Function; import com.twitter.util.Future; import com.twitter.util.Promise; + +import org.apache.bookkeeper.meta.ZkVersion; import org.apache.commons.lang3.tuple.Pair; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; @@ -59,7 +62,7 @@ public class ZKSubscriptionsStore implements SubscriptionsStore { ZKSubscriptionStateStore ss = subscribers.get(subscriberId); if (ss == null) { ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc, - String.format("%s/%s", zkPath, subscriberId)); + getSubscriberZKPath(subscriberId)); ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS); if (oldSS == null) { ss = newSS; @@ -75,6 +78,10 @@ public class ZKSubscriptionsStore implements SubscriptionsStore { return ss; } + private String getSubscriberZKPath(String subscriberId) { + return String.format("%s/%s", zkPath, subscriberId); + } + @Override public Future<DLSN> getLastCommitPosition(String subscriberId) { return getSubscriber(subscriberId).getLastCommitPosition(); @@ -141,6 +148,13 @@ public class ZKSubscriptionsStore implements SubscriptionsStore { } @Override + public Future<Boolean> deleteSubscriber(String subscriberId) { + subscribers.remove(subscriberId); + String path = getSubscriberZKPath(subscriberId); + return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1)); + } + + @Override public void close() throws IOException { // no-op for (SubscriptionStateStore store : subscribers.values()) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9d467a60/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index 0862d54..bed2fcd 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -105,6 +105,8 @@ import com.twitter.distributedlog.metadata.MetadataUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.util.SchedulerUtils; import com.twitter.util.Await; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; import static com.google.common.base.Charsets.UTF_8; @@ -802,7 +804,7 @@ public class DistributedLogTool extends Tool { return 0; } numThreads = Math.min(streams.size(), numThreads); - final int numStreamsPerThreads = streams.size() / numThreads; + final int numStreamsPerThreads = streams.size() / numThreads + 1; Thread[] threads = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { final int tid = i; @@ -2723,6 +2725,120 @@ public class DistributedLogTool extends Tool { } } + public static class DeleteSubscriberCommand extends PerDLCommand { + + int numThreads = 1; + String streamPrefix = null; + String subscriberId = null; + AtomicInteger streamIndex = new AtomicInteger(); + + DeleteSubscriberCommand() { + super("delete_subscriber", "Delete the subscriber in subscription store. "); + options.addOption("s", "subscriberId", true, "SubscriberId to remove from the stream"); + options.addOption("t", "threads", true, "Number of threads"); + options.addOption("ft", "filter", true, "Stream filter by prefix"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("s")) { + throw new ParseException("No subscriberId provided."); + } else { + subscriberId = cmdline.getOptionValue("s"); + } + if (cmdline.hasOption("t")) { + numThreads = Integer.parseInt(cmdline.getOptionValue("t")); + } + if (cmdline.hasOption("ft")) { + streamPrefix = cmdline.getOptionValue("ft"); + } + } + + @Override + protected String getUsage() { + return "delete_subscriber [options]"; + } + + @Override + protected int runCmd() throws Exception { + getConf().setZkAclId(getZkAclId()); + return deleteSubscriber(getFactory()); + } + + private int deleteSubscriber(final com.twitter.distributedlog.DistributedLogManagerFactory factory) throws Exception { + Collection<String> streamCollection = factory.enumerateAllLogsInNamespace(); + final List<String> streams = new ArrayList<String>(); + if (null != streamPrefix) { + for (String s : streamCollection) { + if (s.startsWith(streamPrefix)) { + streams.add(s); + } + } + } else { + streams.addAll(streamCollection); + } + if (0 == streams.size()) { + return 0; + } + System.out.println("Streams : " + streams); + if (!getForce() && !IOUtils.confirmPrompt("Do you want to delete subscriber " + + subscriberId + " for " + streams.size() + " streams ?")) { + return 0; + } + numThreads = Math.min(streams.size(), numThreads); + final int numStreamsPerThreads = streams.size() / numThreads + 1; + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + final int tid = i; + threads[i] = new Thread("RemoveSubscriberThread-" + i) { + @Override + public void run() { + try { + deleteSubscriber(factory, streams, tid, numStreamsPerThreads); + System.out.println("Thread " + tid + " finished."); + } catch (Exception e) { + System.err.println("Thread " + tid + " quits with exception : " + e.getMessage()); + } + } + }; + threads[i].start(); + } + for (int i = 0; i < numThreads; i++) { + threads[i].join(); + } + return 0; + } + + private void deleteSubscriber(com.twitter.distributedlog.DistributedLogManagerFactory factory, List<String> streams, + int tid, int numStreamsPerThreads) throws Exception { + int startIdx = tid * numStreamsPerThreads; + int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads); + for (int i = startIdx; i < endIdx; i++) { + final String s = streams.get(i); + DistributedLogManager dlm = + factory.createDistributedLogManagerWithSharedClients(s); + final CountDownLatch countDownLatch = new CountDownLatch(1); + dlm.getSubscriptionsStore().deleteSubscriber(subscriberId) + .addEventListener(new FutureEventListener<Boolean>() { + @Override + public void onFailure(Throwable cause) { + System.out.println("Failed to delete subscriber for stream " + s); + cause.printStackTrace(); + countDownLatch.countDown(); + } + + @Override + public void onSuccess(Boolean value) { + countDownLatch.countDown(); + } + }); + countDownLatch.await(); + dlm.close(); + } + } + } + public DistributedLogTool() { super(); addCommand(new AuditBKSpaceCommand()); @@ -2748,6 +2864,7 @@ public class DistributedLogTool extends Tool { addCommand(new DeserializeDLSNCommand()); addCommand(new SerializeDLSNCommand()); addCommand(new WatchNamespaceCommand()); + addCommand(new DeleteSubscriberCommand()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/9d467a60/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java index 0731117..fce9bcd 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Utils.java @@ -432,6 +432,44 @@ public class Utils { return promise; } + /** + * Delete the given <i>path</i> from zookeeper. + * + * @param zkc + * zookeeper client + * @param path + * path to delete + * @param version + * version used to set data + * @return future representing if the delete is successful. Return true if the node is deleted, + * false if the node doesn't exist, otherwise future will throw exception + * + */ + public static Future<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) { + ZooKeeper zk; + try { + zk = zkc.get(); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + return Future.exception(FutureUtils.zkException(e, path)); + } catch (InterruptedException e) { + return Future.exception(FutureUtils.zkException(e, path)); + } + final Promise<Boolean> promise = new Promise<Boolean>(); + zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (KeeperException.Code.OK.intValue() == rc ) { + promise.setValue(true); + } else if (KeeperException.Code.NONODE.intValue() == rc) { + promise.setValue(false); + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + return promise; + } + public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable, boolean swallowIOException) { if (null == closeable) {