http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java new file mode 100644 index 0000000..e56a22d --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClient.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.util.FailpointUtils; +import org.apache.distributedlog.zk.ZKWatcherManager; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.RetryPolicy; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}. + * It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}. + * + * <h3>Metrics</h3> + * <ul> + * <li> zookeeper operation stats are exposed under scope <code>zk</code> by + * {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient} + * <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by + * {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase} + * <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code> + * </ul> + */ +public class ZooKeeperClient { + + public static interface Credentials { + + Credentials NONE = new Credentials() { + @Override + public void authenticate(ZooKeeper zooKeeper) { + // noop + } + }; + + void authenticate(ZooKeeper zooKeeper); + } + + public static class DigestCredentials implements Credentials { + + String username; + String password; + + public DigestCredentials(String username, String password) { + this.username = username; + this.password = password; + } + + @Override + public void authenticate(ZooKeeper zooKeeper) { + zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8)); + } + } + + public interface ZooKeeperSessionExpireNotifier { + void notifySessionExpired(); + } + + /** + * Indicates an error connecting to a zookeeper cluster. + */ + public static class ZooKeeperConnectionException extends IOException { + private static final long serialVersionUID = 6682391687004819361L; + + public ZooKeeperConnectionException(String message) { + super(message); + } + + public ZooKeeperConnectionException(String message, Throwable cause) { + super(message, cause); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName()); + + private final String name; + private final int sessionTimeoutMs; + private final int defaultConnectionTimeoutMs; + private final String zooKeeperServers; + // GuardedBy "this", but still volatile for tests, where we want to be able to see writes + // made from within long synchronized blocks. + private volatile ZooKeeper zooKeeper = null; + private final RetryPolicy retryPolicy; + private final StatsLogger statsLogger; + private final int retryThreadCount; + private final double requestRateLimit; + private final Credentials credentials; + private volatile boolean authenticated = false; + private Stopwatch disconnectedStopwatch = null; + + private boolean closed = false; + + final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>(); + + // watcher manager to manage watchers + private final ZKWatcherManager watcherManager; + + /** + * Creates an unconnected client that will lazily attempt to connect on the first call to + * {@link #get}. All successful connections will be authenticated with the given + * {@code credentials}. + * + * @param sessionTimeoutMs + * ZK session timeout in milliseconds + * @param connectionTimeoutMs + * ZK connection timeout in milliseconds + * @param zooKeeperServers + * the set of servers forming the ZK cluster + */ + ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) { + this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0, + Credentials.NONE); + } + + ZooKeeperClient(String name, + int sessionTimeoutMs, + int connectionTimeoutMs, + String zooKeeperServers, + RetryPolicy retryPolicy, + StatsLogger statsLogger, + int retryThreadCount, + double requestRateLimit, + Credentials credentials) { + this.name = name; + this.sessionTimeoutMs = sessionTimeoutMs; + this.zooKeeperServers = zooKeeperServers; + this.defaultConnectionTimeoutMs = connectionTimeoutMs; + this.retryPolicy = retryPolicy; + this.statsLogger = statsLogger; + this.retryThreadCount = retryThreadCount; + this.requestRateLimit = requestRateLimit; + this.credentials = credentials; + this.watcherManager = ZKWatcherManager.newBuilder() + .name(name) + .zkc(this) + .statsLogger(statsLogger.scope("watcher_manager")) + .build(); + } + + public List<ACL> getDefaultACL() { + if (Credentials.NONE == credentials) { + return ZooDefs.Ids.OPEN_ACL_UNSAFE; + } else { + return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL; + } + } + + public ZKWatcherManager getWatcherManager() { + return watcherManager; + } + + /** + * Returns the current active ZK connection or establishes a new one if none has yet been + * established or a previous connection was disconnected or had its session time out. + * + * @return a connected ZooKeeper client + * @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster + * @throws InterruptedException if interrupted while waiting for a connection to be established + * @throws TimeoutException if a connection could not be established within the configured + * session timeout + */ + public synchronized ZooKeeper get() + throws ZooKeeperConnectionException, InterruptedException { + + try { + FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss); + } catch (IOException ioe) { + throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe); + } + + // This indicates that the client was explictly closed + if (closed) { + throw new ZooKeeperConnectionException("Client " + name + " has already been closed"); + } + + // the underneath zookeeper is retryable zookeeper + if (zooKeeper != null && retryPolicy != null) { + if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) { + // the zookeeper client is connected + disconnectedStopwatch = null; + } else { + if (disconnectedStopwatch == null) { + disconnectedStopwatch = Stopwatch.createStarted(); + } else { + long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS); + if (disconnectedMs > defaultConnectionTimeoutMs) { + closeInternal(); + authenticated = false; + } + } + } + } + + if (zooKeeper == null) { + zooKeeper = buildZooKeeper(); + disconnectedStopwatch = null; + } + + // In case authenticate throws an exception, the caller can try to recover the client by + // calling get again. + if (!authenticated) { + credentials.authenticate(zooKeeper); + authenticated = true; + } + + return zooKeeper; + } + + private ZooKeeper buildZooKeeper() + throws ZooKeeperConnectionException, InterruptedException { + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case None: + switch (event.getState()) { + case Expired: + if (null == retryPolicy) { + LOG.info("ZooKeeper {}' session expired. Event: {}", name, event); + closeInternal(); + } + authenticated = false; + break; + case Disconnected: + if (null == retryPolicy) { + LOG.info("ZooKeeper {} is disconnected from zookeeper now," + + " but it is OK unless we received EXPIRED event.", name); + } + // Mark as not authenticated if expired or disconnected. In both cases + // we lose any attached auth info. Relying on Expired/Disconnected is + // sufficient since all Expired/Disconnected events are processed before + // all SyncConnected events, and the underlying member is not updated until + // SyncConnected is received. + authenticated = false; + break; + default: + break; + } + } + + try { + for (Watcher watcher : watchers) { + try { + watcher.process(event); + } catch (Throwable t) { + LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t); + } + } + } catch (Throwable t) { + LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t); + } + } + }; + + Set<Watcher> watchers = new HashSet<Watcher>(); + watchers.add(watcher); + + ZooKeeper zk; + try { + RetryPolicy opRetryPolicy = null == retryPolicy ? + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy; + RetryPolicy connectRetryPolicy = null == retryPolicy ? + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : + new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE); + zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder() + .connectString(zooKeeperServers) + .sessionTimeoutMs(sessionTimeoutMs) + .watchers(watchers) + .operationRetryPolicy(opRetryPolicy) + .connectRetryPolicy(connectRetryPolicy) + .statsLogger(statsLogger) + .retryThreadCount(retryThreadCount) + .requestRateLimit(requestRateLimit) + .build(); + } catch (KeeperException e) { + throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e); + } catch (IOException e) { + throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e); + } + return zk; + } + + /** + * Clients that need to re-establish state after session expiration can register an + * {@code onExpired} command to execute. + * + * @param onExpired the {@code Command} to register + * @return the new {@link Watcher} which can later be passed to {@link #unregister} for + * removal. + */ + public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) { + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) { + try { + onExpired.notifySessionExpired(); + } catch (Exception exc) { + // do nothing + } + } + } + }; + register(watcher); + return watcher; + } + + /** + * Clients that need to register a top-level {@code Watcher} should do so using this method. The + * registered {@code watcher} will remain registered across re-connects and session expiration + * events. + * + * @param watcher the {@code Watcher to register} + */ + public void register(Watcher watcher) { + if (null != watcher) { + watchers.add(watcher); + } + } + + /** + * Clients can attempt to unregister a top-level {@code Watcher} that has previously been + * registered. + * + * @param watcher the {@code Watcher} to unregister as a top-level, persistent watch + * @return whether the given {@code Watcher} was found and removed from the active set + */ + public boolean unregister(Watcher watcher) { + return null != watcher && watchers.remove(watcher); + } + + /** + * Closes the current connection if any expiring the current ZooKeeper session. Any subsequent + * calls to this method will no-op until the next successful {@link #get}. + */ + public synchronized void closeInternal() { + if (zooKeeper != null) { + try { + LOG.info("Closing zookeeper client {}.", name); + zooKeeper.close(); + LOG.info("Closed zookeeper client {}.", name); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e); + } finally { + zooKeeper = null; + } + } + } + + /** + * Closes the the underlying zookeeper instance. + * Subsequent attempts to {@link #get} will fail + */ + public synchronized void close() { + if (closed) { + return; + } + LOG.info("Close zookeeper client {}.", name); + closeInternal(); + // unregister gauges to prevent GC spiral + this.watcherManager.unregisterGauges(); + closed = true; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java new file mode 100644 index 0000000..0c200ce --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ZooKeeperClientBuilder.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Preconditions; +import org.apache.distributedlog.ZooKeeperClient.Credentials; +import org.apache.distributedlog.ZooKeeperClient.DigestCredentials; +import org.apache.distributedlog.impl.BKNamespaceDriver; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.RetryPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +/** + * Builder to build zookeeper client. + */ +public class ZooKeeperClientBuilder { + + static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClientBuilder.class); + + /** + * Create a zookeeper client builder to build zookeeper clients. + * + * @return zookeeper client builder. + */ + public static ZooKeeperClientBuilder newBuilder() { + return new ZooKeeperClientBuilder(); + } + + // name + private String name = "default"; + // sessionTimeoutMs + private int sessionTimeoutMs = -1; + // conectionTimeoutMs + private int conectionTimeoutMs = -1; + // zkServers + private String zkServers = null; + // retry policy + private RetryPolicy retryPolicy = null; + // stats logger + private StatsLogger statsLogger = NullStatsLogger.INSTANCE; + // retry executor thread count + private int retryThreadCount = 1; + // zookeeper access requestRateLimit limit + private double requestRateLimit = 0; + // Did call the zkAclId setter on the builder, used to ensure the setter is set. + private boolean zkAclIdSet = false; + private String zkAclId; + + // Cached ZooKeeper Client + private ZooKeeperClient cachedClient = null; + + private ZooKeeperClientBuilder() {} + + /** + * Set zookeeper client name + * + * @param name zookeeper client name + * @return zookeeper client builder + */ + public synchronized ZooKeeperClientBuilder name(String name) { + this.name = name; + return this; + } + + /** + * Set zookeeper session timeout in milliseconds. + * + * @param sessionTimeoutMs + * session timeout in milliseconds. + * @return zookeeper client builder. + */ + public synchronized ZooKeeperClientBuilder sessionTimeoutMs(int sessionTimeoutMs) { + this.sessionTimeoutMs = sessionTimeoutMs; + if (this.conectionTimeoutMs <= 0) { + this.conectionTimeoutMs = 2 * sessionTimeoutMs; + } + return this; + } + + public synchronized ZooKeeperClientBuilder retryThreadCount(int retryThreadCount) { + this.retryThreadCount = retryThreadCount; + return this; + } + + public synchronized ZooKeeperClientBuilder requestRateLimit(double requestRateLimit) { + this.requestRateLimit = requestRateLimit; + return this; + } + + /** + * Set zookeeper connection timeout in milliseconds + * + * @param connectionTimeoutMs + * connection timeout ms. + * @return builder + */ + public synchronized ZooKeeperClientBuilder connectionTimeoutMs(int connectionTimeoutMs) { + this.conectionTimeoutMs = connectionTimeoutMs; + return this; + } + + /** + * Set ZooKeeper Connect String. + * + * @param zkServers + * zookeeper servers to connect. + * @return builder + */ + public synchronized ZooKeeperClientBuilder zkServers(String zkServers) { + this.zkServers = zkServers; + return this; + } + + /** + * Set DistributedLog URI. + * + * @param uri + * distributedlog uri. + * @return builder. + */ + public synchronized ZooKeeperClientBuilder uri(URI uri) { + this.zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); + return this; + } + + /** + * Build zookeeper client using existing <i>zkc</i> client. + * + * @param zkc + * zookeeper client. + * @return builder + */ + public synchronized ZooKeeperClientBuilder zkc(ZooKeeperClient zkc) { + this.cachedClient = zkc; + return this; + } + + /** + * Build zookeeper client with given retry policy <i>retryPolicy</i>. + * + * @param retryPolicy + * retry policy + * @return builder + */ + public synchronized ZooKeeperClientBuilder retryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Build zookeeper client with given stats logger <i>statsLogger</i>. + * + * @param statsLogger + * stats logger to expose zookeeper stats + * @return builder + */ + public synchronized ZooKeeperClientBuilder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + /** + * * Build zookeeper client with given zk acl digest id <i>zkAclId</i>. + */ + public synchronized ZooKeeperClientBuilder zkAclId(String zkAclId) { + this.zkAclIdSet = true; + this.zkAclId = zkAclId; + return this; + } + + private void validateParameters() { + Preconditions.checkNotNull(zkServers, "No zk servers provided."); + Preconditions.checkArgument(conectionTimeoutMs > 0, + "Invalid connection timeout : %d", conectionTimeoutMs); + Preconditions.checkArgument(sessionTimeoutMs > 0, + "Invalid session timeout : %d", sessionTimeoutMs); + Preconditions.checkNotNull(statsLogger, "No stats logger provided."); + Preconditions.checkArgument(zkAclIdSet, "Zookeeper acl id not set."); + } + + /** + * Build a zookeeper client. + * + * @return zookeeper client. + */ + public synchronized ZooKeeperClient build() { + if (null == cachedClient) { + cachedClient = buildClient(); + } + return cachedClient; + } + + private ZooKeeperClient buildClient() { + validateParameters(); + + Credentials credentials = Credentials.NONE; + if (null != zkAclId) { + credentials = new DigestCredentials(zkAclId, zkAclId); + } + + return new ZooKeeperClient( + name, + sessionTimeoutMs, + conectionTimeoutMs, + zkServers, + retryPolicy, + statsLogger, + retryThreadCount, + requestRateLimit, + credentials + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java new file mode 100644 index 0000000..2c3e738 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/AccessControlManager.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.acl; + +/** + * Access Control on stream operations + */ +public interface AccessControlManager { + + /** + * Whether allowing writing to a stream. + * + * @param stream + * Stream to write + * @return true if allowing writing to the given stream, otherwise false. + */ + boolean allowWrite(String stream); + + /** + * Whether allowing truncating a given stream. + * + * @param stream + * Stream to truncate + * @return true if allowing truncating a given stream. + */ + boolean allowTruncate(String stream); + + /** + * Whether allowing deleting a given stream. + * + * @param stream + * Stream to delete + * @return true if allowing deleting a given stream. + */ + boolean allowDelete(String stream); + + /** + * Whether allowing proxies to acquire a given stream. + * + * @param stream + * stream to acquire + * @return true if allowing proxies to acquire the given stream. + */ + boolean allowAcquire(String stream); + + /** + * Whether allowing proxies to release ownership for a given stream. + * + * @param stream + * stream to release + * @return true if allowing proxies to release a given stream. + */ + boolean allowRelease(String stream); + + /** + * Close the access control manager. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java new file mode 100644 index 0000000..bf3352a --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/DefaultAccessControlManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.acl; + +public class DefaultAccessControlManager implements AccessControlManager { + + public static final DefaultAccessControlManager INSTANCE = new DefaultAccessControlManager(); + + private DefaultAccessControlManager() { + } + + @Override + public boolean allowWrite(String stream) { + return true; + } + + @Override + public boolean allowTruncate(String stream) { + return true; + } + + @Override + public boolean allowDelete(String stream) { + return true; + } + + @Override + public boolean allowAcquire(String stream) { + return true; + } + + @Override + public boolean allowRelease(String stream) { + return true; + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java new file mode 100644 index 0000000..4218bfc --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/acl/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Access Control for distributedlog streams. + */ +package org.apache.distributedlog.acl; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java new file mode 100644 index 0000000..4e94984 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java @@ -0,0 +1,921 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.admin; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.ReadUtils; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.ZooKeeperClientBuilder; +import org.apache.distributedlog.impl.BKNamespaceDriver; +import org.apache.distributedlog.impl.acl.ZKAccessControl; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.impl.federated.FederatedZKLogMetadataStore; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.metadata.DLMetadata; +import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; +import org.apache.distributedlog.metadata.MetadataUpdater; +import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.thrift.AccessControlEntry; +import org.apache.distributedlog.tools.DistributedLogTool; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.util.Await; +import com.twitter.util.Function; +import com.twitter.util.Future; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.IOUtils; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Admin Tool for DistributedLog. + */ +public class DistributedLogAdmin extends DistributedLogTool { + + static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class); + + /** + * Fix inprogress segment with lower ledger sequence number. + * + * @param namespace + * dl namespace + * @param metadataUpdater + * metadata updater. + * @param streamName + * stream name. + * @param verbose + * print verbose messages. + * @param interactive + * is confirmation needed before executing actual action. + * @throws IOException + */ + public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace, + final MetadataUpdater metadataUpdater, + final String streamName, + final boolean verbose, + final boolean interactive) throws IOException { + DistributedLogManager dlm = namespace.openLog(streamName); + try { + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + if (verbose) { + System.out.println("LogSegments for " + streamName + " : "); + for (LogSegmentMetadata segment : segments) { + System.out.println(segment.getLogSegmentSequenceNumber() + "\t: " + segment); + } + } + LOG.info("Get log segments for {} : {}", streamName, segments); + // validate log segments + long maxCompletedLogSegmentSequenceNumber = -1L; + LogSegmentMetadata inprogressSegment = null; + for (LogSegmentMetadata segment : segments) { + if (!segment.isInProgress()) { + maxCompletedLogSegmentSequenceNumber = Math.max(maxCompletedLogSegmentSequenceNumber, segment.getLogSegmentSequenceNumber()); + } else { + // we already found an inprogress segment + if (null != inprogressSegment) { + throw new DLIllegalStateException("Multiple inprogress segments found for stream " + streamName + " : " + segments); + } + inprogressSegment = segment; + } + } + if (null == inprogressSegment || inprogressSegment.getLogSegmentSequenceNumber() > maxCompletedLogSegmentSequenceNumber) { + // nothing to fix + return; + } + final long newLogSegmentSequenceNumber = maxCompletedLogSegmentSequenceNumber + 1; + if (interactive && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) { + return; + } + final LogSegmentMetadata newSegment = + FutureUtils.result(metadataUpdater.changeSequenceNumber(inprogressSegment, newLogSegmentSequenceNumber)); + LOG.info("Fixed {} : {} -> {} ", + new Object[] { streamName, inprogressSegment, newSegment }); + if (verbose) { + System.out.println("Fixed " + streamName + " : " + inprogressSegment.getZNodeName() + + " -> " + newSegment.getZNodeName()); + System.out.println("\t old: " + inprogressSegment); + System.out.println("\t new: " + newSegment); + System.out.println(); + } + } finally { + dlm.close(); + } + } + + private static class LogSegmentCandidate { + final LogSegmentMetadata metadata; + final LogRecordWithDLSN lastRecord; + + LogSegmentCandidate(LogSegmentMetadata metadata, LogRecordWithDLSN lastRecord) { + this.metadata = metadata; + this.lastRecord = lastRecord; + } + + @Override + public String toString() { + return "LogSegmentCandidate[ metadata = " + metadata + ", last record = " + lastRecord + " ]"; + } + + } + + private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR = + new Comparator<LogSegmentCandidate>() { + @Override + public int compare(LogSegmentCandidate o1, LogSegmentCandidate o2) { + return LogSegmentMetadata.COMPARATOR.compare(o1.metadata, o2.metadata); + } + }; + + private static class StreamCandidate { + + final String streamName; + final SortedSet<LogSegmentCandidate> segmentCandidates = + new TreeSet<LogSegmentCandidate>(LOG_SEGMENT_CANDIDATE_COMPARATOR); + + StreamCandidate(String streamName) { + this.streamName = streamName; + } + + synchronized void addLogSegmentCandidate(LogSegmentCandidate segmentCandidate) { + segmentCandidates.add(segmentCandidate); + } + + @Override + public String toString() { + return "StreamCandidate[ name = " + streamName + ", segments = " + segmentCandidates + " ]"; + } + } + + public static void checkAndRepairDLNamespace(final URI uri, + final DistributedLogNamespace namespace, + final MetadataUpdater metadataUpdater, + final OrderedScheduler scheduler, + final boolean verbose, + final boolean interactive) throws IOException { + checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1); + } + + public static void checkAndRepairDLNamespace(final URI uri, + final DistributedLogNamespace namespace, + final MetadataUpdater metadataUpdater, + final OrderedScheduler scheduler, + final boolean verbose, + final boolean interactive, + final int concurrency) throws IOException { + Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found."); + // 0. getting streams under a given uri. + Iterator<String> streamsIter = namespace.getLogs(); + List<String> streams = Lists.newArrayList(); + while (streamsIter.hasNext()) { + streams.add(streamsIter.next()); + } + if (verbose) { + System.out.println("- 0. checking streams under " + uri); + } + if (streams.size() == 0) { + System.out.println("+ 0. nothing to check. quit."); + return; + } + Map<String, StreamCandidate> streamCandidates = + checkStreams(namespace, streams, scheduler, concurrency); + if (verbose) { + System.out.println("+ 0. " + streamCandidates.size() + " corrupted streams found."); + } + if (interactive && !IOUtils.confirmPrompt("Do you want to fix all " + streamCandidates.size() + " corrupted streams (Y/N) : ")) { + return; + } + if (verbose) { + System.out.println("- 1. repairing " + streamCandidates.size() + " corrupted streams."); + } + for (StreamCandidate candidate : streamCandidates.values()) { + if (!repairStream(metadataUpdater, candidate, verbose, interactive)) { + if (verbose) { + System.out.println("* 1. aborted repairing corrupted streams."); + } + return; + } + } + if (verbose) { + System.out.println("+ 1. repaired " + streamCandidates.size() + " corrupted streams."); + } + } + + private static Map<String, StreamCandidate> checkStreams( + final DistributedLogNamespace namespace, + final Collection<String> streams, + final OrderedScheduler scheduler, + final int concurrency) throws IOException { + final LinkedBlockingQueue<String> streamQueue = + new LinkedBlockingQueue<String>(); + streamQueue.addAll(streams); + final Map<String, StreamCandidate> candidateMap = + new ConcurrentSkipListMap<String, StreamCandidate>(); + final AtomicInteger numPendingStreams = new AtomicInteger(streams.size()); + final CountDownLatch doneLatch = new CountDownLatch(1); + Runnable checkRunnable = new Runnable() { + @Override + public void run() { + while (!streamQueue.isEmpty()) { + String stream; + try { + stream = streamQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + StreamCandidate candidate; + try { + LOG.info("Checking stream {}.", stream); + candidate = checkStream(namespace, stream, scheduler); + LOG.info("Checked stream {} - {}.", stream, candidate); + } catch (IOException e) { + LOG.error("Error on checking stream {} : ", stream, e); + doneLatch.countDown(); + break; + } + if (null != candidate) { + candidateMap.put(stream, candidate); + } + if (numPendingStreams.decrementAndGet() == 0) { + doneLatch.countDown(); + } + } + } + }; + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + threads[i] = new Thread(checkRunnable, "check-thread-" + i); + threads[i].start(); + } + try { + doneLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (numPendingStreams.get() != 0) { + throw new IOException(numPendingStreams.get() + " streams left w/o checked"); + } + for (int i = 0; i < concurrency; i++) { + threads[i].interrupt(); + try { + threads[i].join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return candidateMap; + } + + private static StreamCandidate checkStream( + final DistributedLogNamespace namespace, + final String streamName, + final OrderedScheduler scheduler) throws IOException { + DistributedLogManager dlm = namespace.openLog(streamName); + try { + List<LogSegmentMetadata> segments = dlm.getLogSegments(); + if (segments.isEmpty()) { + return null; + } + List<Future<LogSegmentCandidate>> futures = + new ArrayList<Future<LogSegmentCandidate>>(segments.size()); + for (LogSegmentMetadata segment : segments) { + futures.add(checkLogSegment(namespace, streamName, segment, scheduler)); + } + List<LogSegmentCandidate> segmentCandidates; + try { + segmentCandidates = Await.result(Future.collect(futures)); + } catch (Exception e) { + throw new IOException("Failed on checking stream " + streamName, e); + } + StreamCandidate streamCandidate = new StreamCandidate(streamName); + for (LogSegmentCandidate segmentCandidate: segmentCandidates) { + if (null != segmentCandidate) { + streamCandidate.addLogSegmentCandidate(segmentCandidate); + } + } + if (streamCandidate.segmentCandidates.isEmpty()) { + return null; + } + return streamCandidate; + } finally { + dlm.close(); + } + } + + private static Future<LogSegmentCandidate> checkLogSegment( + final DistributedLogNamespace namespace, + final String streamName, + final LogSegmentMetadata metadata, + final OrderedScheduler scheduler) { + if (metadata.isInProgress()) { + return Future.value(null); + } + + final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver() + .getLogSegmentEntryStore(NamespaceDriver.Role.READER); + return ReadUtils.asyncReadLastRecord( + streamName, + metadata, + true, + false, + true, + 4, + 16, + new AtomicInteger(0), + scheduler, + entryStore + ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() { + @Override + public LogSegmentCandidate apply(LogRecordWithDLSN record) { + if (null != record && + (record.getDlsn().compareTo(metadata.getLastDLSN()) > 0 || + record.getTransactionId() > metadata.getLastTxId() || + !metadata.isRecordPositionWithinSegmentScope(record))) { + return new LogSegmentCandidate(metadata, record); + } else { + return null; + } + } + }); + } + + private static boolean repairStream(MetadataUpdater metadataUpdater, + StreamCandidate streamCandidate, + boolean verbose, + boolean interactive) throws IOException { + if (verbose) { + System.out.println("Stream " + streamCandidate.streamName + " : "); + for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) { + System.out.println(" " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + + " : metadata = " + segmentCandidate.metadata + ", last dlsn = " + + segmentCandidate.lastRecord.getDlsn()); + } + System.out.println("-------------------------------------------"); + } + if (interactive && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) { + return false; + } + for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) { + LogSegmentMetadata newMetadata = FutureUtils.result( + metadataUpdater.updateLastRecord(segmentCandidate.metadata, segmentCandidate.lastRecord)); + if (verbose) { + System.out.println(" Fixed segment " + segmentCandidate.metadata.getLogSegmentSequenceNumber() + " : "); + System.out.println(" old metadata : " + segmentCandidate.metadata); + System.out.println(" new metadata : " + newMetadata); + } + } + if (verbose) { + System.out.println("-------------------------------------------"); + } + return true; + } + + // + // Commands + // + + /** + * Unbind the bookkeeper environment for a given distributedlog uri. + * + * TODO: move unbind operation to namespace driver + */ + class UnbindCommand extends OptsCommand { + + Options options = new Options(); + + UnbindCommand() { + super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance."); + options.addOption("f", "force", false, "Force unbinding without prompt."); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected String getUsage() { + return "unbind [options] <distributedlog uri>"; + } + + @Override + protected int runCmd(CommandLine cmdline) throws Exception { + String[] args = cmdline.getArgs(); + if (args.length <= 0) { + System.err.println("No distributedlog uri specified."); + printUsage(); + return -1; + } + boolean force = cmdline.hasOption("f"); + URI uri = URI.create(args[0]); + // resolving the uri to see if there is another bindings in this uri. + ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri) + .sessionTimeoutMs(10000).build(); + BKDLConfig bkdlConfig; + try { + bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); + } catch (IOException ie) { + bkdlConfig = null; + } + if (null == bkdlConfig) { + System.out.println("No bookkeeper is bound to " + uri); + return 0; + } else { + System.out.println("There is bookkeeper bound to " + uri + " : "); + System.out.println(""); + System.out.println(bkdlConfig.toString()); + System.out.println(""); + if (!force && !IOUtils.confirmPrompt("Do you want to unbind " + uri + " :\n")) { + return 0; + } + } + DLMetadata.unbind(uri); + System.out.println("Unbound on " + uri + "."); + return 0; + } + } + + /** + * Bind Command to bind bookkeeper environment for a given distributed uri. + * + * TODO: move bind to namespace driver + */ + class BindCommand extends OptsCommand { + + Options options = new Options(); + + BindCommand() { + super("bind", "bind the bookkeeper environment settings for a given distributedlog instance."); + options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance."); + options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers."); + options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers."); + options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers."); + options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers."); + options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id."); + options.addOption("r", "encodeRegionID", true, "Flag to encode region id."); + options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade"); + options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace"); + options.addOption("f", "force", false, "Force binding without prompt."); + options.addOption("c", "creation", false, "Whether is it a creation binding."); + options.addOption("q", "query", false, "Query the bookkeeper bindings"); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected String getUsage() { + return "bind [options] <distributedlog uri>"; + } + + @Override + protected int runCmd(CommandLine cmdline) throws Exception { + boolean isQuery = cmdline.hasOption("q"); + if (!isQuery && (!cmdline.hasOption("l") || !cmdline.hasOption("s"))) { + System.err.println("Error: Neither zkServers nor ledgersPath specified for bookkeeper environment."); + printUsage(); + return -1; + } + String[] args = cmdline.getArgs(); + if (args.length <= 0) { + System.err.println("No distributedlog uri specified."); + printUsage(); + return -1; + } + boolean force = cmdline.hasOption("f"); + boolean creation = cmdline.hasOption("c"); + String bkLedgersPath = cmdline.getOptionValue("l"); + String bkZkServersForWriter = cmdline.getOptionValue("s"); + boolean sanityCheckTxnID = + !cmdline.hasOption("i") || Boolean.parseBoolean(cmdline.getOptionValue("i")); + boolean encodeRegionID = + cmdline.hasOption("r") && Boolean.parseBoolean(cmdline.getOptionValue("r")); + + String bkZkServersForReader; + if (cmdline.hasOption("bkzr")) { + bkZkServersForReader = cmdline.getOptionValue("bkzr"); + } else { + bkZkServersForReader = bkZkServersForWriter; + } + + URI uri = URI.create(args[0]); + + String dlZkServersForWriter; + String dlZkServersForReader; + if (cmdline.hasOption("dlzw")) { + dlZkServersForWriter = cmdline.getOptionValue("dlzw"); + } else { + dlZkServersForWriter = BKNamespaceDriver.getZKServersFromDLUri(uri); + } + if (cmdline.hasOption("dlzr")) { + dlZkServersForReader = cmdline.getOptionValue("dlzr"); + } else { + dlZkServersForReader = dlZkServersForWriter; + } + + // resolving the uri to see if there is another bindings in this uri. + ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().uri(uri).zkAclId(null) + .sessionTimeoutMs(10000).build(); + try { + BKDLConfig newBKDLConfig = + new BKDLConfig(dlZkServersForWriter, dlZkServersForReader, + bkZkServersForWriter, bkZkServersForReader, bkLedgersPath) + .setSanityCheckTxnID(sanityCheckTxnID) + .setEncodeRegionID(encodeRegionID); + + if (cmdline.hasOption("seqno")) { + newBKDLConfig = newBKDLConfig.setFirstLogSegmentSeqNo(Long.parseLong(cmdline.getOptionValue("seqno"))); + } + + if (cmdline.hasOption("fns")) { + newBKDLConfig = newBKDLConfig.setFederatedNamespace(true); + } + + BKDLConfig bkdlConfig; + try { + bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); + } catch (IOException ie) { + bkdlConfig = null; + } + if (null == bkdlConfig) { + System.out.println("No bookkeeper is bound to " + uri); + } else { + System.out.println("There is bookkeeper bound to " + uri + " : "); + System.out.println(""); + System.out.println(bkdlConfig.toString()); + System.out.println(""); + if (!isQuery) { + if (newBKDLConfig.equals(bkdlConfig)) { + System.out.println("No bookkeeper binding needs to be updated. Quit."); + return 0; + } else if(!newBKDLConfig.isFederatedNamespace() && bkdlConfig.isFederatedNamespace()) { + System.out.println("You can't turn a federated namespace back to non-federated."); + return 0; + } else { + if (!force && !IOUtils.confirmPrompt("Do you want to bind " + uri + + " with new bookkeeper instance :\n" + newBKDLConfig)) { + return 0; + } + } + } + } + if (isQuery) { + System.out.println("Done."); + return 0; + } + DLMetadata dlMetadata = DLMetadata.create(newBKDLConfig); + if (creation) { + try { + dlMetadata.create(uri); + System.out.println("Created binding on " + uri + "."); + } catch (IOException ie) { + System.err.println("Failed to create binding on " + uri + " : " + ie.getMessage()); + } + } else { + try { + dlMetadata.update(uri); + System.out.println("Updated binding on " + uri + " : "); + System.out.println(""); + System.out.println(newBKDLConfig.toString()); + System.out.println(""); + } catch (IOException ie) { + System.err.println("Failed to update binding on " + uri + " : " + ie.getMessage()); + } + } + if (newBKDLConfig.isFederatedNamespace()) { + try { + FederatedZKLogMetadataStore.createFederatedNamespace(uri, zkc); + } catch (KeeperException.NodeExistsException nee) { + // ignore node exists exception + } + } + return 0; + } finally { + zkc.close(); + } + } + } + + static class RepairSeqNoCommand extends PerDLCommand { + + boolean dryrun = false; + boolean verbose = false; + final List<String> streams = new ArrayList<String>(); + + RepairSeqNoCommand() { + super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number."); + options.addOption("d", "dryrun", false, "Dry run without repairing"); + options.addOption("l", "list", true, "List of streams to repair, separated by comma"); + options.addOption("v", "verbose", false, "Print verbose messages"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + dryrun = cmdline.hasOption("d"); + verbose = cmdline.hasOption("v"); + force = !dryrun && cmdline.hasOption("f"); + if (!cmdline.hasOption("l")) { + throw new ParseException("No streams provided to repair"); + } + String streamsList = cmdline.getOptionValue("l"); + Collections.addAll(streams, streamsList.split(",")); + } + + @Override + protected int runCmd() throws Exception { + MetadataUpdater metadataUpdater = dryrun ? + new DryrunLogSegmentMetadataStoreUpdater(getConf(), + getLogSegmentMetadataStore()) : + LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), + getLogSegmentMetadataStore()); + System.out.println("List of streams : "); + System.out.println(streams); + if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) { + return -1; + } + for (String stream : streams) { + fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), metadataUpdater, stream, verbose, !getForce()); + } + return 0; + } + + @Override + protected String getUsage() { + return "repairseqno [options]"; + } + } + + static class DLCKCommand extends PerDLCommand { + + boolean dryrun = false; + boolean verbose = false; + int concurrency = 1; + + DLCKCommand() { + super("dlck", "Check and repair a distributedlog namespace"); + options.addOption("d", "dryrun", false, "Dry run without repairing"); + options.addOption("v", "verbose", false, "Print verbose messages"); + options.addOption("cy", "concurrency", true, "Concurrency on checking streams"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + dryrun = cmdline.hasOption("d"); + verbose = cmdline.hasOption("v"); + if (cmdline.hasOption("cy")) { + try { + concurrency = Integer.parseInt(cmdline.getOptionValue("cy")); + } catch (NumberFormatException nfe) { + throw new ParseException("Invalid concurrency value : " + cmdline.getOptionValue("cy")); + } + } + } + + @Override + protected int runCmd() throws Exception { + MetadataUpdater metadataUpdater = dryrun ? + new DryrunLogSegmentMetadataStoreUpdater(getConf(), + getLogSegmentMetadataStore()) : + LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), + getLogSegmentMetadataStore()); + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("dlck-scheduler") + .corePoolSize(Runtime.getRuntime().availableProcessors()) + .build(); + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + checkAndRepairDLNamespace(getUri(), getNamespace(), metadataUpdater, scheduler, + verbose, !getForce(), concurrency); + } finally { + SchedulerUtils.shutdownScheduler(executorService, 5, TimeUnit.MINUTES); + } + return 0; + } + + @Override + protected String getUsage() { + return "dlck [options]"; + } + } + + static class DeleteStreamACLCommand extends PerDLCommand { + + String stream = null; + + DeleteStreamACLCommand() { + super("delete_stream_acl", "Delete ACL for a given stream"); + options.addOption("s", "stream", true, "Stream to set ACL"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("s")) { + throw new ParseException("No stream to set ACL"); + } + stream = cmdline.getOptionValue("s"); + } + + @Override + protected int runCmd() throws Exception { + BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri()); + if (null == bkdlConfig.getACLRootPath()) { + // acl isn't enabled for this namespace. + System.err.println("ACL isn't enabled for namespace " + getUri()); + return -1; + } + String zkPath = getUri() + "/" + bkdlConfig.getACLRootPath() + "/" + stream; + ZKAccessControl.delete(getZooKeeperClient(), zkPath); + return 0; + } + + @Override + protected String getUsage() { + return null; + } + } + + static class SetStreamACLCommand extends SetACLCommand { + + String stream = null; + + SetStreamACLCommand() { + super("set_stream_acl", "Set Default ACL for a given stream"); + options.addOption("s", "stream", true, "Stream to set ACL"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("s")) { + throw new ParseException("No stream to set ACL"); + } + stream = cmdline.getOptionValue("s"); + } + + @Override + protected String getZKPath(String zkRootPath) { + return zkRootPath + "/" + stream; + } + + @Override + protected String getUsage() { + return "set_stream_acl [options]"; + } + } + + static class SetDefaultACLCommand extends SetACLCommand { + + SetDefaultACLCommand() { + super("set_default_acl", "Set Default ACL for a namespace"); + } + + @Override + protected String getZKPath(String zkRootPath) { + return zkRootPath; + } + + @Override + protected String getUsage() { + return "set_default_acl [options]"; + } + } + + static abstract class SetACLCommand extends PerDLCommand { + + boolean denyWrite = false; + boolean denyTruncate = false; + boolean denyDelete = false; + boolean denyAcquire = false; + boolean denyRelease = false; + + protected SetACLCommand(String name, String description) { + super(name, description); + options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests"); + options.addOption("dt", "deny-truncate", false, "Deny truncate requests"); + options.addOption("dd", "deny-delete", false, "Deny delete requests"); + options.addOption("da", "deny-acquire", false, "Deny acquire requests"); + options.addOption("dr", "deny-release", false, "Deny release requests"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + denyWrite = cmdline.hasOption("dw"); + denyTruncate = cmdline.hasOption("dt"); + denyDelete = cmdline.hasOption("dd"); + denyAcquire = cmdline.hasOption("da"); + denyRelease = cmdline.hasOption("dr"); + } + + protected abstract String getZKPath(String zkRootPath); + + protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception { + ZKAccessControl accessControl; + try { + accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + } catch (KeeperException.NoNodeException nne) { + accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath); + } + return accessControl; + } + + protected void setZKAccessControl(ZooKeeperClient zkc, ZKAccessControl accessControl) throws Exception { + String zkPath = accessControl.getZKPath(); + if (null == zkc.get().exists(zkPath, false)) { + accessControl.create(zkc); + } else { + accessControl.update(zkc); + } + } + + @Override + protected int runCmd() throws Exception { + BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri()); + if (null == bkdlConfig.getACLRootPath()) { + // acl isn't enabled for this namespace. + System.err.println("ACL isn't enabled for namespace " + getUri()); + return -1; + } + String zkPath = getZKPath(getUri().getPath() + "/" + bkdlConfig.getACLRootPath()); + ZKAccessControl accessControl = getZKAccessControl(getZooKeeperClient(), zkPath); + AccessControlEntry acl = accessControl.getAccessControlEntry(); + acl.setDenyWrite(denyWrite); + acl.setDenyTruncate(denyTruncate); + acl.setDenyDelete(denyDelete); + acl.setDenyAcquire(denyAcquire); + acl.setDenyRelease(denyRelease); + setZKAccessControl(getZooKeeperClient(), accessControl); + return 0; + } + + } + + public DistributedLogAdmin() { + super(); + commands.clear(); + addCommand(new HelpCommand()); + addCommand(new BindCommand()); + addCommand(new UnbindCommand()); + addCommand(new RepairSeqNoCommand()); + addCommand(new DLCKCommand()); + addCommand(new SetDefaultACLCommand()); + addCommand(new SetStreamACLCommand()); + addCommand(new DeleteStreamACLCommand()); + } + + @Override + protected String getName() { + return "dlog_admin"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java new file mode 100644 index 0000000..d708111 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Admin Tools for DistributedLog + */ +package org.apache.distributedlog.admin;