http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java deleted file mode 100644 index 862f05a..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.placement; - -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.util.Utils; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.List; -import java.util.TreeSet; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Transaction; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to - * avoid necessitating an additional system for the resource placement. - */ -public class ZKPlacementStateManager implements PlacementStateManager { - - private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class); - - private static final String SERVER_LOAD_DIR = "/.server-load"; - - private final String serverLoadPath; - private final ZooKeeperClient zkClient; - - private boolean watching = false; - - public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { - String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); - zkClient = BKNamespaceDriver.createZKClientBuilder( - String.format("ZKPlacementStateManager-%s", zkServers), - conf, - zkServers, - statsLogger.scope("placement_state_manager")).build(); - serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; - } - - private void createServerLoadPathIfNoExists(byte[] data) - throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { - try { - Utils.zkCreateFullPathOptimistic( - zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - logger.debug("the server load path {} is already created by others", serverLoadPath, nee); - } - } - - @Override - public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException { - logger.info("saving ownership"); - try { - ZooKeeper zk = zkClient.get(); - // use timestamp as data so watchers will see any changes - byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); - - if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist - createServerLoadPathIfNoExists(timestamp); - } - - Transaction tx = zk.transaction(); - List<String> children = zk.getChildren(serverLoadPath, false); - HashSet<String> servers = new HashSet<String>(children); - tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated - for (ServerLoad serverLoad : serverLoads) { - String server = serverToZkFormat(serverLoad.getServer()); - String serverPath = serverPath(server); - if (servers.contains(server)) { - servers.remove(server); - tx.setData(serverPath, serverLoad.serialize(), -1); - } else { - tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT); - } - } - for (String server : servers) { - tx.delete(serverPath(server), -1); - } - tx.commit(); - } catch (InterruptedException | IOException | KeeperException e) { - throw new StateManagerSaveException(e); - } - } - - @Override - public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException { - TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); - try { - ZooKeeper zk = zkClient.get(); - List<String> children = zk.getChildren(serverLoadPath, false); - for (String server : children) { - ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat()))); - } - return ownerships; - } catch (InterruptedException | IOException | KeeperException e) { - throw new StateManagerLoadException(e); - } - } - - @Override - public synchronized void watch(final PlacementCallback callback) { - if (watching) { - return; // do not double watch - } - watching = true; - - try { - ZooKeeper zk = zkClient.get(); - try { - zk.getData(serverLoadPath, new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { - try { - callback.callback(loadOwnership()); - } catch (StateManagerLoadException e) { - logger.error("Watch of Ownership failed", e); - } finally { - watching = false; - watch(callback); - } - } - }, new Stat()); - } catch (KeeperException.NoNodeException nee) { - byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); - createServerLoadPathIfNoExists(timestamp); - watching = false; - watch(callback); - } - } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { - logger.error("Watch of Ownership failed", e); - watching = false; - watch(callback); - } - } - - public String serverPath(String server) { - return String.format("%s/%s", serverLoadPath, server); - } - - protected String serverToZkFormat(String server) { - return server.replaceAll("/", "--"); - } - - protected String zkFormatToServer(String zkFormattedServer) { - return zkFormattedServer.replaceAll("--", "/"); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java deleted file mode 100644 index ea79251..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Placement Policy to place streams across proxy services. - */ -package org.apache.distributedlog.service.placement; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java deleted file mode 100644 index 83ac668..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.exceptions.ChecksumFailedException; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Try; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; - -/** - * Abstract Stream Operation. - */ -public abstract class AbstractStreamOp<Response> implements StreamOp { - - private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class); - - protected final String stream; - protected final OpStatsLogger opStatsLogger; - private final Promise<Response> result = new Promise<Response>(); - protected final Stopwatch stopwatch = Stopwatch.createUnstarted(); - protected final Long checksum; - protected final Feature checksumDisabledFeature; - - public AbstractStreamOp(String stream, - OpStatsLogger statsLogger, - Long checksum, - Feature checksumDisabledFeature) { - this.stream = stream; - this.opStatsLogger = statsLogger; - // start here in case the operation is failed before executing. - stopwatch.reset().start(); - this.checksum = checksum; - this.checksumDisabledFeature = checksumDisabledFeature; - } - - @Override - public String streamName() { - return stream; - } - - @Override - public Stopwatch stopwatch() { - return stopwatch; - } - - @Override - public void preExecute() throws DLException { - if (!checksumDisabledFeature.isAvailable() && null != checksum) { - Long serverChecksum = computeChecksum(); - if (null != serverChecksum && !checksum.equals(serverChecksum)) { - throw new ChecksumFailedException(); - } - } - } - - @Override - public Long computeChecksum() { - return null; - } - - @Override - public Future<Void> execute(AsyncLogWriter writer, Sequencer sequencer, Object txnLock) { - stopwatch.reset().start(); - return executeOp(writer, sequencer, txnLock) - .addEventListener(new FutureEventListener<Response>() { - @Override - public void onSuccess(Response response) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - setResponse(response); - } - @Override - public void onFailure(Throwable cause) { - } - }).voided(); - } - - /** - * Fail with current <i>owner</i> and its reason <i>t</i>. - * - * @param cause - * failure reason - */ - @Override - public void fail(Throwable cause) { - if (cause instanceof OwnershipAcquireFailedException) { - // Ownership exception is a control exception, not an error, so we don't stat - // it with the other errors. - OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause; - fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner())); - } else { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - fail(ResponseUtils.exceptionToHeader(cause)); - } - } - - protected void setResponse(Response response) { - Return<Response> responseTry = new Return(response); - boolean isEmpty = result.updateIfEmpty(responseTry); - if (!isEmpty) { - Option<Try<Response>> resultTry = result.poll(); - logger.error("Result set multiple times. Value='{}', New='{}'", resultTry, responseTry); - } - } - - /** - * Return the full response, header and body. - * - * @return A future containing the response or the exception - * encountered by the op if it failed. - */ - public Future<Response> result() { - return result; - } - - /** - * Execute the operation and return its corresponding response. - * - * @param writer - * writer to execute the operation. - * @param sequencer - * sequencer used for generating transaction id for stream operations - * @param txnLock - * transaction lock to guarantee ordering of transaction id - * @return future representing the operation. - */ - protected abstract Future<Response> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock); - - // fail the result with the given response header - protected abstract void fail(ResponseHeader header); - - public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) { - return requestLogger(statsLogger).getOpStatsLogger(opName); - } - - public static StatsLogger requestLogger(StatsLogger statsLogger) { - return statsLogger.scope("request"); - } - - public static StatsLogger requestScope(StatsLogger statsLogger, String scope) { - return requestLogger(statsLogger).scope(scope); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java deleted file mode 100644 index 77c7d71..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ProtocolUtils; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.OpStatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Abstract Write Operation. - */ -public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> { - - protected AbstractWriteOp(String stream, - OpStatsLogger statsLogger, - Long checksum, - Feature checksumDisabledFeature) { - super(stream, statsLogger, checksum, checksumDisabledFeature); - } - - @Override - protected void fail(ResponseHeader header) { - setResponse(ResponseUtils.write(header)); - } - - @Override - public Long computeChecksum() { - return ProtocolUtils.streamOpCRC32(stream); - } - - @Override - public Future<ResponseHeader> responseHeader() { - return result().map(new AbstractFunction1<WriteResponse, ResponseHeader>() { - @Override - public ResponseHeader apply(WriteResponse response) { - return response.getHeader(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java deleted file mode 100644 index 6c98468..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.AlreadyClosedException; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.LockingException; -import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.thrift.service.BulkWriteResponse; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.ConstFuture; -import com.twitter.util.Future; -import com.twitter.util.Future$; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Try; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Bulk Write Operation. - */ -public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload { - private final List<ByteBuffer> buffers; - private final long payloadSize; - - // Stats - private final Counter deniedBulkWriteCounter; - private final Counter successRecordCounter; - private final Counter failureRecordCounter; - private final Counter redirectRecordCounter; - private final OpStatsLogger latencyStat; - private final Counter bytes; - private final Counter bulkWriteBytes; - - private final AccessControlManager accessControlManager; - - // We need to pass these through to preserve ownership change behavior in - // client/server. Only include failures which are guaranteed to have failed - // all subsequent writes. - private boolean isDefiniteFailure(Try<DLSN> result) { - boolean def = false; - try { - result.get(); - } catch (Exception ex) { - if (ex instanceof OwnershipAcquireFailedException - || ex instanceof AlreadyClosedException - || ex instanceof LockingException) { - def = true; - } - } - return def; - } - - public BulkWriteOp(String stream, - List<ByteBuffer> buffers, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - StreamPartitionConverter streamPartitionConverter, - Long checksum, - Feature checksumDisabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "bulkWrite"), checksum, checksumDisabledFeature); - this.buffers = buffers; - long total = 0; - // We do this here because the bytebuffers are mutable. - for (ByteBuffer bb : buffers) { - total += bb.remaining(); - } - this.payloadSize = total; - - final Partition partition = streamPartitionConverter.convert(stream); - // Write record stats - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite"); - this.successRecordCounter = streamOpStats.recordsCounter("success"); - this.failureRecordCounter = streamOpStats.recordsCounter("failure"); - this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); - this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes"); - this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite"); - this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes"); - - this.accessControlManager = accessControlManager; - - final long size = getPayloadSize(); - result().addEventListener(new FutureEventListener<BulkWriteResponse>() { - @Override - public void onSuccess(BulkWriteResponse response) { - if (response.getHeader().getCode() == StatusCode.SUCCESS) { - latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - bytes.add(size); - bulkWriteBytes.add(size); - } else { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - } - } - @Override - public void onFailure(Throwable cause) { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowWrite(stream)) { - deniedBulkWriteCounter.inc(); - throw new RequestDeniedException(stream, "bulkWrite"); - } - super.preExecute(); - } - - @Override - public long getPayloadSize() { - return payloadSize; - } - - @Override - protected Future<BulkWriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - // Need to convert input buffers to LogRecords. - List<LogRecord> records; - Future<List<Future<DLSN>>> futureList; - synchronized (txnLock) { - records = asRecordList(buffers, sequencer); - futureList = writer.writeBulk(records); - } - - // Collect into a list of tries to make it easier to extract exception or DLSN. - Future<List<Try<DLSN>>> writes = asTryList(futureList); - - Future<BulkWriteResponse> response = writes.flatMap( - new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() { - @Override - public Future<BulkWriteResponse> apply(List<Try<DLSN>> results) { - - // Considered a success at batch level even if no individual writes succeeed. - // The reason is that its impossible to make an appropriate decision re retries without - // individual buffer failure reasons. - List<WriteResponse> writeResponses = new ArrayList<WriteResponse>(results.size()); - BulkWriteResponse bulkWriteResponse = - ResponseUtils.bulkWriteSuccess().setWriteResponses(writeResponses); - - // Promote the first result to an op-level failure if we're sure all other writes have - // failed. - if (results.size() > 0) { - Try<DLSN> firstResult = results.get(0); - if (isDefiniteFailure(firstResult)) { - return new ConstFuture(firstResult); - } - } - - // Translate all futures to write responses. - Iterator<Try<DLSN>> iterator = results.iterator(); - while (iterator.hasNext()) { - Try<DLSN> completedFuture = iterator.next(); - try { - DLSN dlsn = completedFuture.get(); - WriteResponse writeResponse = ResponseUtils.writeSuccess().setDlsn(dlsn.serialize()); - writeResponses.add(writeResponse); - successRecordCounter.inc(); - } catch (Exception ioe) { - WriteResponse writeResponse = ResponseUtils.write(ResponseUtils.exceptionToHeader(ioe)); - writeResponses.add(writeResponse); - if (StatusCode.FOUND == writeResponse.getHeader().getCode()) { - redirectRecordCounter.inc(); - } else { - failureRecordCounter.inc(); - } - } - } - - return Future.value(bulkWriteResponse); - } - } - ); - - return response; - } - - private List<LogRecord> asRecordList(List<ByteBuffer> buffers, Sequencer sequencer) { - List<LogRecord> records = new ArrayList<LogRecord>(buffers.size()); - for (ByteBuffer buffer : buffers) { - byte[] payload = new byte[buffer.remaining()]; - buffer.get(payload); - records.add(new LogRecord(sequencer.nextId(), payload)); - } - return records; - } - - private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> futureList) { - return futureList.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() { - @Override - public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> results) { - return Future$.MODULE$.collectToTry(results); - } - }); - } - - @Override - protected void fail(ResponseHeader header) { - if (StatusCode.FOUND == header.getCode()) { - redirectRecordCounter.add(buffers.size()); - } else { - failureRecordCounter.add(buffers.size()); - } - setResponse(ResponseUtils.bulkWrite(header)); - } - - @Override - public Future<ResponseHeader> responseHeader() { - return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { - @Override - public ResponseHeader apply(BulkWriteResponse response) { - return response.getHeader(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java deleted file mode 100644 index 3ecb46f..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Operation to delete a log stream. - */ -public class DeleteOp extends AbstractWriteOp { - private final StreamManager streamManager; - private final Counter deniedDeleteCounter; - private final AccessControlManager accessControlManager; - - public DeleteOp(String stream, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - StreamManager streamManager, - Long checksum, - Feature checksumEnabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "delete"), checksum, checksumEnabledFeature); - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.deniedDeleteCounter = streamOpStats.requestDeniedCounter("delete"); - this.accessControlManager = accessControlManager; - this.streamManager = streamManager; - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - Future<Void> result = streamManager.deleteAndRemoveAsync(streamName()); - return result.map(new AbstractFunction1<Void, WriteResponse>() { - @Override - public WriteResponse apply(Void value) { - return ResponseUtils.writeSuccess(); - } - }); - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowTruncate(stream)) { - deniedDeleteCounter.inc(); - throw new RequestDeniedException(stream, "delete"); - } - super.preExecute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java deleted file mode 100644 index 0ffa619..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import static com.google.common.base.Charsets.UTF_8; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Heartbeat Operation. - */ -public class HeartbeatOp extends AbstractWriteOp { - - static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8); - - private final AccessControlManager accessControlManager; - private final Counter deniedHeartbeatCounter; - private final byte dlsnVersion; - - private boolean writeControlRecord = false; - - public HeartbeatOp(String stream, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - byte dlsnVersion, - Long checksum, - Feature checksumDisabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "heartbeat"), checksum, checksumDisabledFeature); - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.deniedHeartbeatCounter = streamOpStats.requestDeniedCounter("heartbeat"); - this.dlsnVersion = dlsnVersion; - this.accessControlManager = accessControlManager; - } - - public HeartbeatOp setWriteControlRecord(boolean writeControlRecord) { - this.writeControlRecord = writeControlRecord; - return this; - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - // write a control record if heartbeat is the first request of the recovered log segment. - if (writeControlRecord) { - long txnId; - Future<DLSN> writeResult; - synchronized (txnLock) { - txnId = sequencer.nextId(); - LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA); - hbRecord.setControl(); - writeResult = writer.write(hbRecord); - } - return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { - @Override - public WriteResponse apply(DLSN value) { - return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion)); - } - }); - } else { - return Future.value(ResponseUtils.writeSuccess()); - } - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowAcquire(stream)) { - deniedHeartbeatCounter.inc(); - throw new RequestDeniedException(stream, "heartbeat"); - } - super.preExecute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java deleted file mode 100644 index 6ec8642..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.AsyncLogWriter; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RequestDeniedException; -import org.apache.distributedlog.service.ResponseUtils; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; -import com.twitter.util.Future; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import scala.runtime.AbstractFunction1; - -/** - * Operation to release ownership of a log stream. - */ -public class ReleaseOp extends AbstractWriteOp { - private final StreamManager streamManager; - private final Counter deniedReleaseCounter; - private final AccessControlManager accessControlManager; - - public ReleaseOp(String stream, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - StreamManager streamManager, - Long checksum, - Feature checksumDisabledFeature, - AccessControlManager accessControlManager) { - super(stream, requestStat(statsLogger, "release"), checksum, checksumDisabledFeature); - StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - this.deniedReleaseCounter = streamOpStats.requestDeniedCounter("release"); - this.accessControlManager = accessControlManager; - this.streamManager = streamManager; - } - - @Override - protected Future<WriteResponse> executeOp(AsyncLogWriter writer, - Sequencer sequencer, - Object txnLock) { - Future<Void> result = streamManager.closeAndRemoveAsync(streamName()); - return result.map(new AbstractFunction1<Void, WriteResponse>() { - @Override - public WriteResponse apply(Void value) { - return ResponseUtils.writeSuccess(); - } - }); - } - - @Override - public void preExecute() throws DLException { - if (!accessControlManager.allowRelease(stream)) { - deniedReleaseCounter.inc(); - throw new RequestDeniedException(stream, "release"); - } - super.preExecute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java deleted file mode 100644 index 3517a63..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/Stream.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.service.streamset.Partition; -import com.twitter.util.Future; -import java.io.IOException; - -/** - * Stream is the per stream request handler in the DL service layer. - * - * <p>The collection of Streams in the proxy are managed by StreamManager. - */ -public interface Stream { - - /** - * Get the stream configuration for this stream. - * - * @return stream configuration - */ - DynamicDistributedLogConfiguration getStreamConfiguration(); - - /** - * Get the stream's last recorded current owner (may be out of date). Used - * as a hint for the client. - * @return last known owner for the stream - */ - String getOwner(); - - /** - * Get the stream name. - * @return stream name - */ - String getStreamName(); - - /** - * Get the represented partition name. - * - * @return represented partition name. - */ - Partition getPartition(); - - /** - * Expensive initialization code run after stream has been allocated in - * StreamManager. - * - * @throws IOException when encountered exception on initialization - */ - void initialize() throws IOException; - - /** - * Another initialize method (actually Thread.start). Should probably be - * moved to initialize(). - */ - void start(); - - /** - * Asynchronous close method. - * @param reason for closing - * @return future satisfied once close complete - */ - Future<Void> requestClose(String reason); - - /** - * Delete the stream from DL backend. - * - * @throws IOException when encountered exception on deleting the stream. - */ - void delete() throws IOException; - - /** - * Execute the stream operation against this stream. - * - * @param op operation to execute - */ - void submit(StreamOp op); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java deleted file mode 100644 index 845ef21..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; - -/** - * Factory to create a stream with provided stream configuration {@code streamConf}. - */ -public interface StreamFactory { - - /** - * Create a stream object. - * - * @param name stream name - * @param streamConf stream configuration - * @param streamManager manager of streams - * @return stream object - */ - Stream create(String name, - DynamicDistributedLogConfiguration streamConf, - StreamManager streamManager); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java deleted file mode 100644 index 2b90d55..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.stream; - -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.service.FatalErrorHandler; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Timer; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.jboss.netty.util.HashedWheelTimer; - -/** - * The implementation of {@link StreamFactory}. - */ -public class StreamFactoryImpl implements StreamFactory { - private final String clientId; - private final StreamOpStats streamOpStats; - private final ServerConfiguration serverConfig; - private final DistributedLogConfiguration dlConfig; - private final FeatureProvider featureProvider; - private final StreamConfigProvider streamConfigProvider; - private final StreamPartitionConverter streamPartitionConverter; - private final DistributedLogNamespace dlNamespace; - private final OrderedScheduler scheduler; - private final FatalErrorHandler fatalErrorHandler; - private final HashedWheelTimer requestTimer; - private final Timer futureTimer; - - public StreamFactoryImpl(String clientId, - StreamOpStats streamOpStats, - ServerConfiguration serverConfig, - DistributedLogConfiguration dlConfig, - FeatureProvider featureProvider, - StreamConfigProvider streamConfigProvider, - StreamPartitionConverter streamPartitionConverter, - DistributedLogNamespace dlNamespace, - OrderedScheduler scheduler, - FatalErrorHandler fatalErrorHandler, - HashedWheelTimer requestTimer) { - - this.clientId = clientId; - this.streamOpStats = streamOpStats; - this.serverConfig = serverConfig; - this.dlConfig = dlConfig; - this.featureProvider = featureProvider; - this.streamConfigProvider = streamConfigProvider; - this.streamPartitionConverter = streamPartitionConverter; - this.dlNamespace = dlNamespace; - this.scheduler = scheduler; - this.fatalErrorHandler = fatalErrorHandler; - this.requestTimer = requestTimer; - this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer); - } - - @Override - public Stream create(String name, - DynamicDistributedLogConfiguration streamConf, - StreamManager streamManager) { - return new StreamImpl(name, - streamPartitionConverter.convert(name), - clientId, - streamManager, - streamOpStats, - serverConfig, - dlConfig, - streamConf, - featureProvider, - streamConfigProvider, - dlNamespace, - scheduler, - fatalErrorHandler, - requestTimer, - futureTimer); - } -}