http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java deleted file mode 100644 index 74cd6cf..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java +++ /dev/null @@ -1,402 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.util.FailpointUtils; -import com.twitter.distributedlog.zk.ZKWatcherManager; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; -import org.apache.bookkeeper.zookeeper.RetryPolicy; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}. - * It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}. - * - * <h3>Metrics</h3> - * <ul> - * <li> zookeeper operation stats are exposed under scope <code>zk</code> by - * {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient} - * <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by - * {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase} - * <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code> - * </ul> - */ -public class ZooKeeperClient { - - public static interface Credentials { - - Credentials NONE = new Credentials() { - @Override - public void authenticate(ZooKeeper zooKeeper) { - // noop - } - }; - - void authenticate(ZooKeeper zooKeeper); - } - - public static class DigestCredentials implements Credentials { - - String username; - String password; - - public DigestCredentials(String username, String password) { - this.username = username; - this.password = password; - } - - @Override - public void authenticate(ZooKeeper zooKeeper) { - zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8)); - } - } - - public interface ZooKeeperSessionExpireNotifier { - void notifySessionExpired(); - } - - /** - * Indicates an error connecting to a zookeeper cluster. - */ - public static class ZooKeeperConnectionException extends IOException { - private static final long serialVersionUID = 6682391687004819361L; - - public ZooKeeperConnectionException(String message) { - super(message); - } - - public ZooKeeperConnectionException(String message, Throwable cause) { - super(message, cause); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName()); - - private final String name; - private final int sessionTimeoutMs; - private final int defaultConnectionTimeoutMs; - private final String zooKeeperServers; - // GuardedBy "this", but still volatile for tests, where we want to be able to see writes - // made from within long synchronized blocks. - private volatile ZooKeeper zooKeeper = null; - private final RetryPolicy retryPolicy; - private final StatsLogger statsLogger; - private final int retryThreadCount; - private final double requestRateLimit; - private final Credentials credentials; - private volatile boolean authenticated = false; - private Stopwatch disconnectedStopwatch = null; - - private boolean closed = false; - - final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); - - // watcher manager to manage watchers - private final ZKWatcherManager watcherManager; - - /** - * Creates an unconnected client that will lazily attempt to connect on the first call to - * {@link #get}. All successful connections will be authenticated with the given - * {@code credentials}. - * - * @param sessionTimeoutMs - * ZK session timeout in milliseconds - * @param connectionTimeoutMs - * ZK connection timeout in milliseconds - * @param zooKeeperServers - * the set of servers forming the ZK cluster - */ - ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) { - this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0, - Credentials.NONE); - } - - ZooKeeperClient(String name, - int sessionTimeoutMs, - int connectionTimeoutMs, - String zooKeeperServers, - RetryPolicy retryPolicy, - StatsLogger statsLogger, - int retryThreadCount, - double requestRateLimit, - Credentials credentials) { - this.name = name; - this.sessionTimeoutMs = sessionTimeoutMs; - this.zooKeeperServers = zooKeeperServers; - this.defaultConnectionTimeoutMs = connectionTimeoutMs; - this.retryPolicy = retryPolicy; - this.statsLogger = statsLogger; - this.retryThreadCount = retryThreadCount; - this.requestRateLimit = requestRateLimit; - this.credentials = credentials; - this.watcherManager = ZKWatcherManager.newBuilder() - .name(name) - .zkc(this) - .statsLogger(statsLogger.scope("watcher_manager")) - .build(); - } - - public List<ACL> getDefaultACL() { - if (Credentials.NONE == credentials) { - return ZooDefs.Ids.OPEN_ACL_UNSAFE; - } else { - return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL; - } - } - - public ZKWatcherManager getWatcherManager() { - return watcherManager; - } - - /** - * Returns the current active ZK connection or establishes a new one if none has yet been - * established or a previous connection was disconnected or had its session time out. - * - * @return a connected ZooKeeper client - * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster - * @throws InterruptedException if interrupted while waiting for a connection to be established - * @throws TimeoutException if a connection could not be established within the configured - * session timeout - */ - public synchronized ZooKeeper get() - throws ZooKeeperConnectionException, InterruptedException { - - try { - FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss); - } catch (IOException ioe) { - throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe); - } - - // This indicates that the client was explictly closed - if (closed) { - throw new ZooKeeperConnectionException("Client " + name + " has already been closed"); - } - - // the underneath zookeeper is retryable zookeeper - if (zooKeeper != null && retryPolicy != null) { - if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) { - // the zookeeper client is connected - disconnectedStopwatch = null; - } else { - if (disconnectedStopwatch == null) { - disconnectedStopwatch = Stopwatch.createStarted(); - } else { - long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS); - if (disconnectedMs > defaultConnectionTimeoutMs) { - closeInternal(); - authenticated = false; - } - } - } - } - - if (zooKeeper == null) { - zooKeeper = buildZooKeeper(); - disconnectedStopwatch = null; - } - - // In case authenticate throws an exception, the caller can try to recover the client by - // calling get again. - if (!authenticated) { - credentials.authenticate(zooKeeper); - authenticated = true; - } - - return zooKeeper; - } - - private ZooKeeper buildZooKeeper() - throws ZooKeeperConnectionException, InterruptedException { - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - switch (event.getType()) { - case None: - switch (event.getState()) { - case Expired: - if (null == retryPolicy) { - LOG.info("ZooKeeper {}' session expired. Event: {}", name, event); - closeInternal(); - } - authenticated = false; - break; - case Disconnected: - if (null == retryPolicy) { - LOG.info("ZooKeeper {} is disconnected from zookeeper now," + - " but it is OK unless we received EXPIRED event.", name); - } - // Mark as not authenticated if expired or disconnected. In both cases - // we lose any attached auth info. Relying on Expired/Disconnected is - // sufficient since all Expired/Disconnected events are processed before - // all SyncConnected events, and the underlying member is not updated until - // SyncConnected is received. - authenticated = false; - break; - default: - break; - } - } - - try { - for (Watcher watcher : watchers) { - try { - watcher.process(event); - } catch (Throwable t) { - LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t); - } - } - } catch (Throwable t) { - LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t); - } - } - }; - - Set<Watcher> watchers = new HashSet<Watcher>(); - watchers.add(watcher); - - ZooKeeper zk; - try { - RetryPolicy opRetryPolicy = null == retryPolicy ? - new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy; - RetryPolicy connectRetryPolicy = null == retryPolicy ? - new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : - new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE); - zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder() - .connectString(zooKeeperServers) - .sessionTimeoutMs(sessionTimeoutMs) - .watchers(watchers) - .operationRetryPolicy(opRetryPolicy) - .connectRetryPolicy(connectRetryPolicy) - .statsLogger(statsLogger) - .retryThreadCount(retryThreadCount) - .requestRateLimit(requestRateLimit) - .build(); - } catch (KeeperException e) { - throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e); - } catch (IOException e) { - throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e); - } - return zk; - } - - /** - * Clients that need to re-establish state after session expiration can register an - * {@code onExpired} command to execute. - * - * @param onExpired the {@code Command} to register - * @return the new {@link Watcher} which can later be passed to {@link #unregister} for - * removal. - */ - public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) { - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { - try { - onExpired.notifySessionExpired(); - } catch (Exception exc) { - // do nothing - } - } - } - }; - register(watcher); - return watcher; - } - - /** - * Clients that need to register a top-level {@code Watcher} should do so using this method. The - * registered {@code watcher} will remain registered across re-connects and session expiration - * events. - * - * @param watcher the {@code Watcher to register} - */ - public void register(Watcher watcher) { - if (null != watcher) { - watchers.add(watcher); - } - } - - /** - * Clients can attempt to unregister a top-level {@code Watcher} that has previously been - * registered. - * - * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch - * @return whether the given {@code Watcher} was found and removed from the active set - */ - public boolean unregister(Watcher watcher) { - return null != watcher && watchers.remove(watcher); - } - - /** - * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent - * calls to this method will no-op until the next successful {@link #get}. - */ - public synchronized void closeInternal() { - if (zooKeeper != null) { - try { - LOG.info("Closing zookeeper client {}.", name); - zooKeeper.close(); - LOG.info("Closed zookeeper client {}.", name); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e); - } finally { - zooKeeper = null; - } - } - } - - /** - * Closes the the underlying zookeeper instance. - * Subsequent attempts to {@link #get} will fail - */ - public synchronized void close() { - if (closed) { - return; - } - LOG.info("Close zookeeper client {}.", name); - closeInternal(); - // unregister gauges to prevent GC spiral - this.watcherManager.unregisterGauges(); - closed = true; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java deleted file mode 100644 index 15f1805..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClientBuilder.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.ZooKeeperClient.Credentials; -import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.zookeeper.RetryPolicy; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; - -/** - * Builder to build zookeeper client. - */ -public class ZooKeeperClientBuilder { - - static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClientBuilder.class); - - /** - * Create a zookeeper client builder to build zookeeper clients. - * - * @return zookeeper client builder. - */ - public static ZooKeeperClientBuilder newBuilder() { - return new ZooKeeperClientBuilder(); - } - - // name - private String name = "default"; - // sessionTimeoutMs - private int sessionTimeoutMs = -1; - // conectionTimeoutMs - private int conectionTimeoutMs = -1; - // zkServers - private String zkServers = null; - // retry policy - private RetryPolicy retryPolicy = null; - // stats logger - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - // retry executor thread count - private int retryThreadCount = 1; - // zookeeper access requestRateLimit limit - private double requestRateLimit = 0; - // Did call the zkAclId setter on the builder, used to ensure the setter is set. - private boolean zkAclIdSet = false; - private String zkAclId; - - // Cached ZooKeeper Client - private ZooKeeperClient cachedClient = null; - - private ZooKeeperClientBuilder() {} - - /** - * Set zookeeper client name - * - * @param name zookeeper client name - * @return zookeeper client builder - */ - public synchronized ZooKeeperClientBuilder name(String name) { - this.name = name; - return this; - } - - /** - * Set zookeeper session timeout in milliseconds. - * - * @param sessionTimeoutMs - * session timeout in milliseconds. - * @return zookeeper client builder. - */ - public synchronized ZooKeeperClientBuilder sessionTimeoutMs(int sessionTimeoutMs) { - this.sessionTimeoutMs = sessionTimeoutMs; - if (this.conectionTimeoutMs <= 0) { - this.conectionTimeoutMs = 2 * sessionTimeoutMs; - } - return this; - } - - public synchronized ZooKeeperClientBuilder retryThreadCount(int retryThreadCount) { - this.retryThreadCount = retryThreadCount; - return this; - } - - public synchronized ZooKeeperClientBuilder requestRateLimit(double requestRateLimit) { - this.requestRateLimit = requestRateLimit; - return this; - } - - /** - * Set zookeeper connection timeout in milliseconds - * - * @param connectionTimeoutMs - * connection timeout ms. - * @return builder - */ - public synchronized ZooKeeperClientBuilder connectionTimeoutMs(int connectionTimeoutMs) { - this.conectionTimeoutMs = connectionTimeoutMs; - return this; - } - - /** - * Set ZooKeeper Connect String. - * - * @param zkServers - * zookeeper servers to connect. - * @return builder - */ - public synchronized ZooKeeperClientBuilder zkServers(String zkServers) { - this.zkServers = zkServers; - return this; - } - - /** - * Set DistributedLog URI. - * - * @param uri - * distributedlog uri. - * @return builder. - */ - public synchronized ZooKeeperClientBuilder uri(URI uri) { - this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); - return this; - } - - /** - * Build zookeeper client using existing <i>zkc</i> client. - * - * @param zkc - * zookeeper client. - * @return builder - */ - public synchronized ZooKeeperClientBuilder zkc(ZooKeeperClient zkc) { - this.cachedClient = zkc; - return this; - } - - /** - * Build zookeeper client with given retry policy <i>retryPolicy</i>. - * - * @param retryPolicy - * retry policy - * @return builder - */ - public synchronized ZooKeeperClientBuilder retryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; - return this; - } - - /** - * Build zookeeper client with given stats logger <i>statsLogger</i>. - * - * @param statsLogger - * stats logger to expose zookeeper stats - * @return builder - */ - public synchronized ZooKeeperClientBuilder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - /** - * * Build zookeeper client with given zk acl digest id <i>zkAclId</i>. - */ - public synchronized ZooKeeperClientBuilder zkAclId(String zkAclId) { - this.zkAclIdSet = true; - this.zkAclId = zkAclId; - return this; - } - - private void validateParameters() { - Preconditions.checkNotNull(zkServers, "No zk servers provided."); - Preconditions.checkArgument(conectionTimeoutMs > 0, - "Invalid connection timeout : %d", conectionTimeoutMs); - Preconditions.checkArgument(sessionTimeoutMs > 0, - "Invalid session timeout : %d", sessionTimeoutMs); - Preconditions.checkNotNull(statsLogger, "No stats logger provided."); - Preconditions.checkArgument(zkAclIdSet, "Zookeeper acl id not set."); - } - - /** - * Build a zookeeper client. - * - * @return zookeeper client. - */ - public synchronized ZooKeeperClient build() { - if (null == cachedClient) { - cachedClient = buildClient(); - } - return cachedClient; - } - - private ZooKeeperClient buildClient() { - validateParameters(); - - Credentials credentials = Credentials.NONE; - if (null != zkAclId) { - credentials = new DigestCredentials(zkAclId, zkAclId); - } - - return new ZooKeeperClient( - name, - sessionTimeoutMs, - conectionTimeoutMs, - zkServers, - retryPolicy, - statsLogger, - retryThreadCount, - requestRateLimit, - credentials - ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java deleted file mode 100644 index 5fcc87e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/AccessControlManager.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.acl; - -/** - * Access Control on stream operations - */ -public interface AccessControlManager { - - /** - * Whether allowing writing to a stream. - * - * @param stream - * Stream to write - * @return true if allowing writing to the given stream, otherwise false. - */ - boolean allowWrite(String stream); - - /** - * Whether allowing truncating a given stream. - * - * @param stream - * Stream to truncate - * @return true if allowing truncating a given stream. - */ - boolean allowTruncate(String stream); - - /** - * Whether allowing deleting a given stream. - * - * @param stream - * Stream to delete - * @return true if allowing deleting a given stream. - */ - boolean allowDelete(String stream); - - /** - * Whether allowing proxies to acquire a given stream. - * - * @param stream - * stream to acquire - * @return true if allowing proxies to acquire the given stream. - */ - boolean allowAcquire(String stream); - - /** - * Whether allowing proxies to release ownership for a given stream. - * - * @param stream - * stream to release - * @return true if allowing proxies to release a given stream. - */ - boolean allowRelease(String stream); - - /** - * Close the access control manager. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java deleted file mode 100644 index e757595..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/DefaultAccessControlManager.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.acl; - -public class DefaultAccessControlManager implements AccessControlManager { - - public static final DefaultAccessControlManager INSTANCE = new DefaultAccessControlManager(); - - private DefaultAccessControlManager() { - } - - @Override - public boolean allowWrite(String stream) { - return true; - } - - @Override - public boolean allowTruncate(String stream) { - return true; - } - - @Override - public boolean allowDelete(String stream) { - return true; - } - - @Override - public boolean allowAcquire(String stream) { - return true; - } - - @Override - public boolean allowRelease(String stream) { - return true; - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java deleted file mode 100644 index 65109fc..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/acl/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Access Control for distributedlog streams. - */ -package com.twitter.distributedlog.acl; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java deleted file mode 100644 index 0512907..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/DistributedLogAdmin.java +++ /dev/null @@ -1,921 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.admin; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ReadUtils; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import com.twitter.distributedlog.impl.acl.ZKAccessControl; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.impl.federated.FederatedZKLogMetadataStore; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.DLMetadata; -import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; -import com.twitter.distributedlog.metadata.MetadataUpdater; -import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.distributedlog.tools.DistributedLogTool; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.util.Await; -import com.twitter.util.Function; -import com.twitter.util.Future; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.util.IOUtils; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Admin Tool for DistributedLog. - */ -public class DistributedLogAdmin extends DistributedLogTool { - - static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class); - - /** - * Fix inprogress segment with lower ledger sequence number. - * - * @param namespace - * dl namespace - * @param metadataUpdater - * metadata updater. - * @param streamName - * stream name. - * @param verbose - * print verbose messages. - * @param interactive - * is confirmation needed before executing actual action. - * @throws IOException - */ - public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace, - final MetadataUpdater metadataUpdater, - final String streamName, - final boolean verbose, - final boolean interactive) throws IOException { - DistributedLogManager dlm = namespace.openLog(streamName); - try { - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - if (verbose) { - System.out.println("LogSegments for " + streamName + " : "); - for (LogSegmentMetadata segment : segments) { - System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment); - } - } - LOG.info("Get log segments for {} : {}", streamName, segments); - // validate log segments - long maxCompletedLogSegmentSequenceNumber = -1L; - LogSegmentMetadata inprogressSegment = null; - for (LogSegmentMetadata segment : segments) { - if (!segment.isInProgress()) { - maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber()); - } else { - // we already found an inprogress segment - if (null != inprogressSegment) { - throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments); - } - inprogressSegment = segment; - } - } - if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) { - // nothing to fix - return; - } - final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1; - if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) { - return; - } - final LogSegmentMetadata newSegment = - FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber)); - LOG.info("Fixed {} : {} -> {} ", - new Object[] { streamName, inprogressSegment, newSegment }); - if (verbose) { - System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName() - + " -> " + newSegment.getZNodeName()); - System.out.println("\t old: " + inprogressSegment); - System.out.println("\t new: " + newSegment); - System.out.println(); - } - } finally { - dlm.close(); - } - } - - private static class LogSegmentCandidate { - final LogSegmentMetadata metadata; - final LogRecordWithDLSN lastRecord; - - LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) { - this.metadata = metadata; - this.lastRecord = lastRecord; - } - - @Override - public String toString() { - return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]"; - } - - } - - private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR = - new Comparator<LogSegmentCandidate>() { - @Override - public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) { - return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata); - } - }; - - private static class StreamCandidate { - - final String streamName; - final SortedSet<LogSegmentCandidate> segmentCandidates = - new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR); - - StreamCandidate(String streamName) { - this.streamName = streamName; - } - - synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) { - segmentCandidates.add(segmentCandidate); - } - - @Override - public String toString() { - return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]"; - } - } - - public static void checkAndRepairDLNamespace(final URI uri, - final DistributedLogNamespace namespace, - final MetadataUpdater metadataUpdater, - final OrderedScheduler scheduler, - final boolean verbose, - final boolean interactive) throws IOException { - checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1); - } - - public static void checkAndRepairDLNamespace(final URI uri, - final DistributedLogNamespace namespace, - final MetadataUpdater metadataUpdater, - final OrderedScheduler scheduler, - final boolean verbose, - final boolean interactive, - final int concurrency) throws IOException { - Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found."); - // 0. getting streams under a given uri. - Iterator<String> streamsIter = namespace.getLogs(); - List<String> streams = Lists.newArrayList(); - while (streamsIter.hasNext()) { - streams.add(streamsIter.next()); - } - if (verbose) { - System.out.println("- 0. checking streams under " + uri); - } - if (streams.size() == 0) { - System.out.println("+ 0. nothing to check. quit."); - return; - } - Map<String, StreamCandidate> streamCandidates = - checkStreams(namespace, streams, scheduler, concurrency); - if (verbose) { - System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found."); - } - if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) { - return; - } - if (verbose) { - System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams."); - } - for (StreamCandidate candidate : streamCandidates.values()) { - if (!repairStream(metadataUpdater, candidate, verbose, interactive)) { - if (verbose) { - System.out.println("* 1. aborted repairing corrupted streams."); - } - return; - } - } - if (verbose) { - System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams."); - } - } - - private static Map<String, StreamCandidate> checkStreams( - final DistributedLogNamespace namespace, - final Collection<String> streams, - final OrderedScheduler scheduler, - final int concurrency) throws IOException { - final LinkedBlockingQueue<String> streamQueue = - new LinkedBlockingQueue<String>(); - streamQueue.addAll(streams); - final Map<String, StreamCandidate> candidateMap = - new ConcurrentSkipListMap<String, StreamCandidate>(); - final AtomicInteger numPendingStreams = new AtomicInteger(streams.size()); - final CountDownLatch doneLatch = new CountDownLatch(1); - Runnable checkRunnable = new Runnable() { - @Override - public void run() { - while (!streamQueue.isEmpty()) { - String stream; - try { - stream = streamQueue.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - StreamCandidate candidate; - try { - LOG.info("Checking stream {}.", stream); - candidate = checkStream(namespace, stream, scheduler); - LOG.info("Checked stream {} - {}.", stream, candidate); - } catch (IOException e) { - LOG.error("Error on checking stream {} : ", stream, e); - doneLatch.countDown(); - break; - } - if (null != candidate) { - candidateMap.put(stream, candidate); - } - if (numPendingStreams.decrementAndGet() == 0) { - doneLatch.countDown(); - } - } - } - }; - Thread[] threads = new Thread[concurrency]; - for (int i = 0; i < concurrency; i++) { - threads[i] = new Thread(checkRunnable, "check-thread-" + i); - threads[i].start(); - } - try { - doneLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (numPendingStreams.get() != 0) { - throw new IOException(numPendingStreams.get() + " streams left w/o checked"); - } - for (int i = 0; i < concurrency; i++) { - threads[i].interrupt(); - try { - threads[i].join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - return candidateMap; - } - - private static StreamCandidate checkStream( - final DistributedLogNamespace namespace, - final String streamName, - final OrderedScheduler scheduler) throws IOException { - DistributedLogManager dlm = namespace.openLog(streamName); - try { - List<LogSegmentMetadata> segments = dlm.getLogSegments(); - if (segments.isEmpty()) { - return null; - } - List<Future<LogSegmentCandidate>> futures = - new ArrayList<Future<LogSegmentCandidate>>(segments.size()); - for (LogSegmentMetadata segment : segments) { - futures.add(checkLogSegment(namespace, streamName, segment, scheduler)); - } - List<LogSegmentCandidate> segmentCandidates; - try { - segmentCandidates = Await.result(Future.collect(futures)); - } catch (Exception e) { - throw new IOException("Failed on checking stream " + streamName, e); - } - StreamCandidate streamCandidate = new StreamCandidate(streamName); - for (LogSegmentCandidate segmentCandidate: segmentCandidates) { - if (null != segmentCandidate) { - streamCandidate.addLogSegmentCandidate(segmentCandidate); - } - } - if (streamCandidate.segmentCandidates.isEmpty()) { - return null; - } - return streamCandidate; - } finally { - dlm.close(); - } - } - - private static Future<LogSegmentCandidate> checkLogSegment( - final DistributedLogNamespace namespace, - final String streamName, - final LogSegmentMetadata metadata, - final OrderedScheduler scheduler) { - if (metadata.isInProgress()) { - return Future.value(null); - } - - final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver() - .getLogSegmentEntryStore(NamespaceDriver.Role.READER); - return ReadUtils.asyncReadLastRecord( - streamName, - metadata, - true, - false, - true, - 4, - 16, - new AtomicInteger(0), - scheduler, - entryStore - ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() { - @Override - public LogSegmentCandidate apply(LogRecordWithDLSN record) { - if (null != record && - (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0 || - record.getTransactionId() > metadata.getLastTxId() || - !metadata.isRecordPositionWithinSegmentScope(record))) { - return new LogSegmentCandidate(metadata, record); - } else { - return null; - } - } - }); - } - - private static boolean repairStream(MetadataUpdater metadataUpdater, - StreamCandidate streamCandidate, - boolean verbose, - boolean interactive) throws IOException { - if (verbose) { - System.out.println("Stream " + streamCandidate.streamName + " : "); - for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) { - System.out.println(" " + segmentCandidate.metadata.getLogSegmentSequenceNumber() - + " : metadata = " + segmentCandidate.metadata + ", last dlsn = " - + segmentCandidate.lastRecord.getDlsn()); - } - System.out.println("-------------------------------------------"); - } - if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) { - return false; - } - for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) { - LogSegmentMetadata newMetadata = FutureUtils.result( - metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord)); - if (verbose) { - System.out.println(" Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : "); - System.out.println(" old metadata : " + segmentCandidate.metadata); - System.out.println(" new metadata : " + newMetadata); - } - } - if (verbose) { - System.out.println("-------------------------------------------"); - } - return true; - } - - // - // Commands - // - - /** - * Unbind the bookkeeper environment for a given distributedlog uri. - * - * TODO: move unbind operation to namespace driver - */ - class UnbindCommand extends OptsCommand { - - Options options = new Options(); - - UnbindCommand() { - super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance."); - options.addOption("f", "force", false, "Force unbinding without prompt."); - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected String getUsage() { - return "unbind [options] <distributedlog uri>"; - } - - @Override - protected int runCmd(CommandLine cmdline) throws Exception { - String[] args = cmdline.getArgs(); - if (args.length <= 0) { - System.err.println("No distributedlog uri specified."); - printUsage(); - return -1; - } - boolean force = cmdline.hasOption("f"); - URI uri = URI.create(args[0]); - // resolving the uri to see if there is another bindings in this uri. - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri) - .sessionTimeoutMs(10000).build(); - BKDLConfig bkdlConfig; - try { - bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); - } catch (IOException ie) { - bkdlConfig = null; - } - if (null == bkdlConfig) { - System.out.println("No bookkeeper is bound to " + uri); - return 0; - } else { - System.out.println("There is bookkeeper bound to " + uri + " : "); - System.out.println(""); - System.out.println(bkdlConfig.toString()); - System.out.println(""); - if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) { - return 0; - } - } - DLMetadata.unbind(uri); - System.out.println("Unbound on " + uri + "."); - return 0; - } - } - - /** - * Bind Command to bind bookkeeper environment for a given distributed uri. - * - * TODO: move bind to namespace driver - */ - class BindCommand extends OptsCommand { - - Options options = new Options(); - - BindCommand() { - super("bind", "bind the bookkeeper environment settings for a given distributedlog instance."); - options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance."); - options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers."); - options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers."); - options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers."); - options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers."); - options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id."); - options.addOption("r", "encodeRegionID", true, "Flag to encode region id."); - options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade"); - options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace"); - options.addOption("f", "force", false, "Force binding without prompt."); - options.addOption("c", "creation", false, "Whether is it a creation binding."); - options.addOption("q", "query", false, "Query the bookkeeper bindings"); - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected String getUsage() { - return "bind [options] <distributedlog uri>"; - } - - @Override - protected int runCmd(CommandLine cmdline) throws Exception { - boolean isQuery = cmdline.hasOption("q"); - if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) { - System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment."); - printUsage(); - return -1; - } - String[] args = cmdline.getArgs(); - if (args.length <= 0) { - System.err.println("No distributedlog uri specified."); - printUsage(); - return -1; - } - boolean force = cmdline.hasOption("f"); - boolean creation = cmdline.hasOption("c"); - String bkLedgersPath = cmdline.getOptionValue("l"); - String bkZkServersForWriter = cmdline.getOptionValue("s"); - boolean sanityCheckTxnID = - !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i")); - boolean encodeRegionID = - cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r")); - - String bkZkServersForReader; - if (cmdline.hasOption("bkzr")) { - bkZkServersForReader = cmdline.getOptionValue("bkzr"); - } else { - bkZkServersForReader = bkZkServersForWriter; - } - - URI uri = URI.create(args[0]); - - String dlZkServersForWriter; - String dlZkServersForReader; - if (cmdline.hasOption("dlzw")) { - dlZkServersForWriter = cmdline.getOptionValue("dlzw"); - } else { - dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri); - } - if (cmdline.hasOption("dlzr")) { - dlZkServersForReader = cmdline.getOptionValue("dlzr"); - } else { - dlZkServersForReader = dlZkServersForWriter; - } - - // resolving the uri to see if there is another bindings in this uri. - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null) - .sessionTimeoutMs(10000).build(); - try { - BKDLConfig newBKDLConfig = - new BKDLConfig(dlZkServersForWriter, dlZkServersForReader, - bkZkServersForWriter, bkZkServersForReader, bkLedgersPath) - .setSanityCheckTxnID(sanityCheckTxnID) - .setEncodeRegionID(encodeRegionID); - - if (cmdline.hasOption("seqno")) { - newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno"))); - } - - if (cmdline.hasOption("fns")) { - newBKDLConfig = newBKDLConfig.setFederatedNamespace(true); - } - - BKDLConfig bkdlConfig; - try { - bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); - } catch (IOException ie) { - bkdlConfig = null; - } - if (null == bkdlConfig) { - System.out.println("No bookkeeper is bound to " + uri); - } else { - System.out.println("There is bookkeeper bound to " + uri + " : "); - System.out.println(""); - System.out.println(bkdlConfig.toString()); - System.out.println(""); - if (!isQuery) { - if (newBKDLConfig.equals(bkdlConfig)) { - System.out.println("No bookkeeper binding needs to be updated. Quit."); - return 0; - } else if(!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) { - System.out.println("You can't turn a federated namespace back to non-federated."); - return 0; - } else { - if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri - + " with new bookkeeper instance :\n" + newBKDLConfig)) { - return 0; - } - } - } - } - if (isQuery) { - System.out.println("Done."); - return 0; - } - DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig); - if (creation) { - try { - dlMetadata.create(uri); - System.out.println("Created binding on " + uri + "."); - } catch (IOException ie) { - System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage()); - } - } else { - try { - dlMetadata.update(uri); - System.out.println("Updated binding on " + uri + " : "); - System.out.println(""); - System.out.println(newBKDLConfig.toString()); - System.out.println(""); - } catch (IOException ie) { - System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage()); - } - } - if (newBKDLConfig.isFederatedNamespace()) { - try { - FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc); - } catch (KeeperException.NodeExistsException nee) { - // ignore node exists exception - } - } - return 0; - } finally { - zkc.close(); - } - } - } - - static class RepairSeqNoCommand extends PerDLCommand { - - boolean dryrun = false; - boolean verbose = false; - final List<String> streams = new ArrayList<String>(); - - RepairSeqNoCommand() { - super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number."); - options.addOption("d", "dryrun", false, "Dry run without repairing"); - options.addOption("l", "list", true, "List of streams to repair, separated by comma"); - options.addOption("v", "verbose", false, "Print verbose messages"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - dryrun = cmdline.hasOption("d"); - verbose = cmdline.hasOption("v"); - force = !dryrun && cmdline.hasOption("f"); - if (!cmdline.hasOption("l")) { - throw new ParseException("No streams provided to repair"); - } - String streamsList = cmdline.getOptionValue("l"); - Collections.addAll(streams, streamsList.split(",")); - } - - @Override - protected int runCmd() throws Exception { - MetadataUpdater metadataUpdater = dryrun ? - new DryrunLogSegmentMetadataStoreUpdater(getConf(), - getLogSegmentMetadataStore()) : - LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), - getLogSegmentMetadataStore()); - System.out.println("List of streams : "); - System.out.println(streams); - if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) { - return -1; - } - for (String stream : streams) { - fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce()); - } - return 0; - } - - @Override - protected String getUsage() { - return "repairseqno [options]"; - } - } - - static class DLCKCommand extends PerDLCommand { - - boolean dryrun = false; - boolean verbose = false; - int concurrency = 1; - - DLCKCommand() { - super("dlck", "Check and repair a distributedlog namespace"); - options.addOption("d", "dryrun", false, "Dry run without repairing"); - options.addOption("v", "verbose", false, "Print verbose messages"); - options.addOption("cy", "concurrency", true, "Concurrency on checking streams"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - dryrun = cmdline.hasOption("d"); - verbose = cmdline.hasOption("v"); - if (cmdline.hasOption("cy")) { - try { - concurrency = Integer.parseInt(cmdline.getOptionValue("cy")); - } catch (NumberFormatException nfe) { - throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy")); - } - } - } - - @Override - protected int runCmd() throws Exception { - MetadataUpdater metadataUpdater = dryrun ? - new DryrunLogSegmentMetadataStoreUpdater(getConf(), - getLogSegmentMetadataStore()) : - LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), - getLogSegmentMetadataStore()); - OrderedScheduler scheduler = OrderedScheduler.newBuilder() - .name("dlck-scheduler") - .corePoolSize(Runtime.getRuntime().availableProcessors()) - .build(); - ExecutorService executorService = Executors.newCachedThreadPool(); - try { - checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler, - verbose, !getForce(), concurrency); - } finally { - SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES); - } - return 0; - } - - @Override - protected String getUsage() { - return "dlck [options]"; - } - } - - static class DeleteStreamACLCommand extends PerDLCommand { - - String stream = null; - - DeleteStreamACLCommand() { - super("delete_stream_acl", "Delete ACL for a given stream"); - options.addOption("s", "stream", true, "Stream to set ACL"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("s")) { - throw new ParseException("No stream to set ACL"); - } - stream = cmdline.getOptionValue("s"); - } - - @Override - protected int runCmd() throws Exception { - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri()); - if (null == bkdlConfig.getACLRootPath()) { - // acl isn't enabled for this namespace. - System.err.println("ACL isn't enabled for namespace " + getUri()); - return -1; - } - String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream; - ZKAccessControl.delete(getZooKeeperClient(), zkPath); - return 0; - } - - @Override - protected String getUsage() { - return null; - } - } - - static class SetStreamACLCommand extends SetACLCommand { - - String stream = null; - - SetStreamACLCommand() { - super("set_stream_acl", "Set Default ACL for a given stream"); - options.addOption("s", "stream", true, "Stream to set ACL"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("s")) { - throw new ParseException("No stream to set ACL"); - } - stream = cmdline.getOptionValue("s"); - } - - @Override - protected String getZKPath(String zkRootPath) { - return zkRootPath + "/" + stream; - } - - @Override - protected String getUsage() { - return "set_stream_acl [options]"; - } - } - - static class SetDefaultACLCommand extends SetACLCommand { - - SetDefaultACLCommand() { - super("set_default_acl", "Set Default ACL for a namespace"); - } - - @Override - protected String getZKPath(String zkRootPath) { - return zkRootPath; - } - - @Override - protected String getUsage() { - return "set_default_acl [options]"; - } - } - - static abstract class SetACLCommand extends PerDLCommand { - - boolean denyWrite = false; - boolean denyTruncate = false; - boolean denyDelete = false; - boolean denyAcquire = false; - boolean denyRelease = false; - - protected SetACLCommand(String name, String description) { - super(name, description); - options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests"); - options.addOption("dt", "deny-truncate", false, "Deny truncate requests"); - options.addOption("dd", "deny-delete", false, "Deny delete requests"); - options.addOption("da", "deny-acquire", false, "Deny acquire requests"); - options.addOption("dr", "deny-release", false, "Deny release requests"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - denyWrite = cmdline.hasOption("dw"); - denyTruncate = cmdline.hasOption("dt"); - denyDelete = cmdline.hasOption("dd"); - denyAcquire = cmdline.hasOption("da"); - denyRelease = cmdline.hasOption("dr"); - } - - protected abstract String getZKPath(String zkRootPath); - - protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception { - ZKAccessControl accessControl; - try { - accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null)); - } catch (KeeperException.NoNodeException nne) { - accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath); - } - return accessControl; - } - - protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception { - String zkPath = accessControl.getZKPath(); - if (null == zkc.get().exists(zkPath, false)) { - accessControl.create(zkc); - } else { - accessControl.update(zkc); - } - } - - @Override - protected int runCmd() throws Exception { - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri()); - if (null == bkdlConfig.getACLRootPath()) { - // acl isn't enabled for this namespace. - System.err.println("ACL isn't enabled for namespace " + getUri()); - return -1; - } - String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath()); - ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath); - AccessControlEntry acl = accessControl.getAccessControlEntry(); - acl.setDenyWrite(denyWrite); - acl.setDenyTruncate(denyTruncate); - acl.setDenyDelete(denyDelete); - acl.setDenyAcquire(denyAcquire); - acl.setDenyRelease(denyRelease); - setZKAccessControl(getZooKeeperClient(), accessControl); - return 0; - } - - } - - public DistributedLogAdmin() { - super(); - commands.clear(); - addCommand(new HelpCommand()); - addCommand(new BindCommand()); - addCommand(new UnbindCommand()); - addCommand(new RepairSeqNoCommand()); - addCommand(new DLCKCommand()); - addCommand(new SetDefaultACLCommand()); - addCommand(new SetStreamACLCommand()); - addCommand(new DeleteStreamACLCommand()); - } - - @Override - protected String getName() { - return "dlog_admin"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java deleted file mode 100644 index a7d6adb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/admin/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Admin Tools for DistributedLog - */ -package com.twitter.distributedlog.admin;