http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java new file mode 100644 index 0000000..862f05a --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.placement; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.impl.BKNamespaceDriver; +import org.apache.distributedlog.util.Utils; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.TreeSet; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to + * avoid necessitating an additional system for the resource placement. + */ +public class ZKPlacementStateManager implements PlacementStateManager { + + private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class); + + private static final String SERVER_LOAD_DIR = "/.server-load"; + + private final String serverLoadPath; + private final ZooKeeperClient zkClient; + + private boolean watching = false; + + public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { + String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri); + zkClient = BKNamespaceDriver.createZKClientBuilder( + String.format("ZKPlacementStateManager-%s", zkServers), + conf, + zkServers, + statsLogger.scope("placement_state_manager")).build(); + serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; + } + + private void createServerLoadPathIfNoExists(byte[] data) + throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { + try { + Utils.zkCreateFullPathOptimistic( + zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + logger.debug("the server load path {} is already created by others", serverLoadPath, nee); + } + } + + @Override + public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException { + logger.info("saving ownership"); + try { + ZooKeeper zk = zkClient.get(); + // use timestamp as data so watchers will see any changes + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + + if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist + createServerLoadPathIfNoExists(timestamp); + } + + Transaction tx = zk.transaction(); + List<String> children = zk.getChildren(serverLoadPath, false); + HashSet<String> servers = new HashSet<String>(children); + tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated + for (ServerLoad serverLoad : serverLoads) { + String server = serverToZkFormat(serverLoad.getServer()); + String serverPath = serverPath(server); + if (servers.contains(server)) { + servers.remove(server); + tx.setData(serverPath, serverLoad.serialize(), -1); + } else { + tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } + } + for (String server : servers) { + tx.delete(serverPath(server), -1); + } + tx.commit(); + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerSaveException(e); + } + } + + @Override + public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + try { + ZooKeeper zk = zkClient.get(); + List<String> children = zk.getChildren(serverLoadPath, false); + for (String server : children) { + ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat()))); + } + return ownerships; + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerLoadException(e); + } + } + + @Override + public synchronized void watch(final PlacementCallback callback) { + if (watching) { + return; // do not double watch + } + watching = true; + + try { + ZooKeeper zk = zkClient.get(); + try { + zk.getData(serverLoadPath, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + try { + callback.callback(loadOwnership()); + } catch (StateManagerLoadException e) { + logger.error("Watch of Ownership failed", e); + } finally { + watching = false; + watch(callback); + } + } + }, new Stat()); + } catch (KeeperException.NoNodeException nee) { + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + createServerLoadPathIfNoExists(timestamp); + watching = false; + watch(callback); + } + } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { + logger.error("Watch of Ownership failed", e); + watching = false; + watch(callback); + } + } + + public String serverPath(String server) { + return String.format("%s/%s", serverLoadPath, server); + } + + protected String serverToZkFormat(String server) { + return server.replaceAll("/", "--"); + } + + protected String zkFormatToServer(String zkFormattedServer) { + return zkFormattedServer.replaceAll("--", "/"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java new file mode 100644 index 0000000..ea79251 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Placement Policy to place streams across proxy services. + */ +package org.apache.distributedlog.service.placement; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java new file mode 100644 index 0000000..83ac668 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.exceptions.ChecksumFailedException; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Try; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * Abstract Stream Operation. + */ +public abstract class AbstractStreamOp<Response> implements StreamOp { + + private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class); + + protected final String stream; + protected final OpStatsLogger opStatsLogger; + private final Promise<Response> result = new Promise<Response>(); + protected final Stopwatch stopwatch = Stopwatch.createUnstarted(); + protected final Long checksum; + protected final Feature checksumDisabledFeature; + + public AbstractStreamOp(String stream, + OpStatsLogger statsLogger, + Long checksum, + Feature checksumDisabledFeature) { + this.stream = stream; + this.opStatsLogger = statsLogger; + // start here in case the operation is failed before executing. + stopwatch.reset().start(); + this.checksum = checksum; + this.checksumDisabledFeature = checksumDisabledFeature; + } + + @Override + public String streamName() { + return stream; + } + + @Override + public Stopwatch stopwatch() { + return stopwatch; + } + + @Override + public void preExecute() throws DLException { + if (!checksumDisabledFeature.isAvailable() && null != checksum) { + Long serverChecksum = computeChecksum(); + if (null != serverChecksum && !checksum.equals(serverChecksum)) { + throw new ChecksumFailedException(); + } + } + } + + @Override + public Long computeChecksum() { + return null; + } + + @Override + public Future<Void> execute(AsyncLogWriter writer, Sequencer sequencer, Object txnLock) { + stopwatch.reset().start(); + return executeOp(writer, sequencer, txnLock) + .addEventListener(new FutureEventListener<Response>() { + @Override + public void onSuccess(Response response) { + opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + setResponse(response); + } + @Override + public void onFailure(Throwable cause) { + } + }).voided(); + } + + /** + * Fail with current <i>owner</i> and its reason <i>t</i>. + * + * @param cause + * failure reason + */ + @Override + public void fail(Throwable cause) { + if (cause instanceof OwnershipAcquireFailedException) { + // Ownership exception is a control exception, not an error, so we don't stat + // it with the other errors. + OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause; + fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner())); + } else { + opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + fail(ResponseUtils.exceptionToHeader(cause)); + } + } + + protected void setResponse(Response response) { + Return<Response> responseTry = new Return(response); + boolean isEmpty = result.updateIfEmpty(responseTry); + if (!isEmpty) { + Option<Try<Response>> resultTry = result.poll(); + logger.error("Result set multiple times. Value='{}', New='{}'", resultTry, responseTry); + } + } + + /** + * Return the full response, header and body. + * + * @return A future containing the response or the exception + * encountered by the op if it failed. + */ + public Future<Response> result() { + return result; + } + + /** + * Execute the operation and return its corresponding response. + * + * @param writer + * writer to execute the operation. + * @param sequencer + * sequencer used for generating transaction id for stream operations + * @param txnLock + * transaction lock to guarantee ordering of transaction id + * @return future representing the operation. + */ + protected abstract Future<Response> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock); + + // fail the result with the given response header + protected abstract void fail(ResponseHeader header); + + public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) { + return requestLogger(statsLogger).getOpStatsLogger(opName); + } + + public static StatsLogger requestLogger(StatsLogger statsLogger) { + return statsLogger.scope("request"); + } + + public static StatsLogger requestScope(StatsLogger statsLogger, String scope) { + return requestLogger(statsLogger).scope(scope); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java new file mode 100644 index 0000000..8befffc --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractWriteOp.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.protocol.util.ProtocolUtils; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.WriteResponse; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.OpStatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Abstract Write Operation. + */ +public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> { + + protected AbstractWriteOp(String stream, + OpStatsLogger statsLogger, + Long checksum, + Feature checksumDisabledFeature) { + super(stream, statsLogger, checksum, checksumDisabledFeature); + } + + @Override + protected void fail(ResponseHeader header) { + setResponse(ResponseUtils.write(header)); + } + + @Override + public Long computeChecksum() { + return ProtocolUtils.streamOpCRC32(stream); + } + + @Override + public Future<ResponseHeader> responseHeader() { + return result().map(new AbstractFunction1<WriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(WriteResponse response) { + return response.getHeader(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java new file mode 100644 index 0000000..6c98468 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.LockingException; +import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.service.streamset.Partition; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.ConstFuture; +import com.twitter.util.Future; +import com.twitter.util.Future$; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Try; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Bulk Write Operation. + */ +public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload { + private final List<ByteBuffer> buffers; + private final long payloadSize; + + // Stats + private final Counter deniedBulkWriteCounter; + private final Counter successRecordCounter; + private final Counter failureRecordCounter; + private final Counter redirectRecordCounter; + private final OpStatsLogger latencyStat; + private final Counter bytes; + private final Counter bulkWriteBytes; + + private final AccessControlManager accessControlManager; + + // We need to pass these through to preserve ownership change behavior in + // client/server. Only include failures which are guaranteed to have failed + // all subsequent writes. + private boolean isDefiniteFailure(Try<DLSN> result) { + boolean def = false; + try { + result.get(); + } catch (Exception ex) { + if (ex instanceof OwnershipAcquireFailedException + || ex instanceof AlreadyClosedException + || ex instanceof LockingException) { + def = true; + } + } + return def; + } + + public BulkWriteOp(String stream, + List<ByteBuffer> buffers, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + StreamPartitionConverter streamPartitionConverter, + Long checksum, + Feature checksumDisabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "bulkWrite"), checksum, checksumDisabledFeature); + this.buffers = buffers; + long total = 0; + // We do this here because the bytebuffers are mutable. + for (ByteBuffer bb : buffers) { + total += bb.remaining(); + } + this.payloadSize = total; + + final Partition partition = streamPartitionConverter.convert(stream); + // Write record stats + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite"); + this.successRecordCounter = streamOpStats.recordsCounter("success"); + this.failureRecordCounter = streamOpStats.recordsCounter("failure"); + this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); + this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes"); + this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite"); + this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes"); + + this.accessControlManager = accessControlManager; + + final long size = getPayloadSize(); + result().addEventListener(new FutureEventListener<BulkWriteResponse>() { + @Override + public void onSuccess(BulkWriteResponse response) { + if (response.getHeader().getCode() == StatusCode.SUCCESS) { + latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + bytes.add(size); + bulkWriteBytes.add(size); + } else { + latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + } + } + @Override + public void onFailure(Throwable cause) { + latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + } + }); + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowWrite(stream)) { + deniedBulkWriteCounter.inc(); + throw new RequestDeniedException(stream, "bulkWrite"); + } + super.preExecute(); + } + + @Override + public long getPayloadSize() { + return payloadSize; + } + + @Override + protected Future<BulkWriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + // Need to convert input buffers to LogRecords. + List<LogRecord> records; + Future<List<Future<DLSN>>> futureList; + synchronized (txnLock) { + records = asRecordList(buffers, sequencer); + futureList = writer.writeBulk(records); + } + + // Collect into a list of tries to make it easier to extract exception or DLSN. + Future<List<Try<DLSN>>> writes = asTryList(futureList); + + Future<BulkWriteResponse> response = writes.flatMap( + new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() { + @Override + public Future<BulkWriteResponse> apply(List<Try<DLSN>> results) { + + // Considered a success at batch level even if no individual writes succeeed. + // The reason is that its impossible to make an appropriate decision re retries without + // individual buffer failure reasons. + List<WriteResponse> writeResponses = new ArrayList<WriteResponse>(results.size()); + BulkWriteResponse bulkWriteResponse = + ResponseUtils.bulkWriteSuccess().setWriteResponses(writeResponses); + + // Promote the first result to an op-level failure if we're sure all other writes have + // failed. + if (results.size() > 0) { + Try<DLSN> firstResult = results.get(0); + if (isDefiniteFailure(firstResult)) { + return new ConstFuture(firstResult); + } + } + + // Translate all futures to write responses. + Iterator<Try<DLSN>> iterator = results.iterator(); + while (iterator.hasNext()) { + Try<DLSN> completedFuture = iterator.next(); + try { + DLSN dlsn = completedFuture.get(); + WriteResponse writeResponse = ResponseUtils.writeSuccess().setDlsn(dlsn.serialize()); + writeResponses.add(writeResponse); + successRecordCounter.inc(); + } catch (Exception ioe) { + WriteResponse writeResponse = ResponseUtils.write(ResponseUtils.exceptionToHeader(ioe)); + writeResponses.add(writeResponse); + if (StatusCode.FOUND == writeResponse.getHeader().getCode()) { + redirectRecordCounter.inc(); + } else { + failureRecordCounter.inc(); + } + } + } + + return Future.value(bulkWriteResponse); + } + } + ); + + return response; + } + + private List<LogRecord> asRecordList(List<ByteBuffer> buffers, Sequencer sequencer) { + List<LogRecord> records = new ArrayList<LogRecord>(buffers.size()); + for (ByteBuffer buffer : buffers) { + byte[] payload = new byte[buffer.remaining()]; + buffer.get(payload); + records.add(new LogRecord(sequencer.nextId(), payload)); + } + return records; + } + + private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> futureList) { + return futureList.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() { + @Override + public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> results) { + return Future$.MODULE$.collectToTry(results); + } + }); + } + + @Override + protected void fail(ResponseHeader header) { + if (StatusCode.FOUND == header.getCode()) { + redirectRecordCounter.add(buffers.size()); + } else { + failureRecordCounter.add(buffers.size()); + } + setResponse(ResponseUtils.bulkWrite(header)); + } + + @Override + public Future<ResponseHeader> responseHeader() { + return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { + @Override + public ResponseHeader apply(BulkWriteResponse response) { + return response.getHeader(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java new file mode 100644 index 0000000..3ecb46f --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Operation to delete a log stream. + */ +public class DeleteOp extends AbstractWriteOp { + private final StreamManager streamManager; + private final Counter deniedDeleteCounter; + private final AccessControlManager accessControlManager; + + public DeleteOp(String stream, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + StreamManager streamManager, + Long checksum, + Feature checksumEnabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "delete"), checksum, checksumEnabledFeature); + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.deniedDeleteCounter = streamOpStats.requestDeniedCounter("delete"); + this.accessControlManager = accessControlManager; + this.streamManager = streamManager; + } + + @Override + protected Future<WriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + Future<Void> result = streamManager.deleteAndRemoveAsync(streamName()); + return result.map(new AbstractFunction1<Void, WriteResponse>() { + @Override + public WriteResponse apply(Void value) { + return ResponseUtils.writeSuccess(); + } + }); + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowTruncate(stream)) { + deniedDeleteCounter.inc(); + throw new RequestDeniedException(stream, "delete"); + } + super.preExecute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java new file mode 100644 index 0000000..0ffa619 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import static com.google.common.base.Charsets.UTF_8; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Heartbeat Operation. + */ +public class HeartbeatOp extends AbstractWriteOp { + + static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8); + + private final AccessControlManager accessControlManager; + private final Counter deniedHeartbeatCounter; + private final byte dlsnVersion; + + private boolean writeControlRecord = false; + + public HeartbeatOp(String stream, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + byte dlsnVersion, + Long checksum, + Feature checksumDisabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "heartbeat"), checksum, checksumDisabledFeature); + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.deniedHeartbeatCounter = streamOpStats.requestDeniedCounter("heartbeat"); + this.dlsnVersion = dlsnVersion; + this.accessControlManager = accessControlManager; + } + + public HeartbeatOp setWriteControlRecord(boolean writeControlRecord) { + this.writeControlRecord = writeControlRecord; + return this; + } + + @Override + protected Future<WriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + // write a control record if heartbeat is the first request of the recovered log segment. + if (writeControlRecord) { + long txnId; + Future<DLSN> writeResult; + synchronized (txnLock) { + txnId = sequencer.nextId(); + LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA); + hbRecord.setControl(); + writeResult = writer.write(hbRecord); + } + return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { + @Override + public WriteResponse apply(DLSN value) { + return ResponseUtils.writeSuccess().setDlsn(value.serialize(dlsnVersion)); + } + }); + } else { + return Future.value(ResponseUtils.writeSuccess()); + } + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowAcquire(stream)) { + deniedHeartbeatCounter.inc(); + throw new RequestDeniedException(stream, "heartbeat"); + } + super.preExecute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java new file mode 100644 index 0000000..6ec8642 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RequestDeniedException; +import org.apache.distributedlog.service.ResponseUtils; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.Sequencer; +import com.twitter.util.Future; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import scala.runtime.AbstractFunction1; + +/** + * Operation to release ownership of a log stream. + */ +public class ReleaseOp extends AbstractWriteOp { + private final StreamManager streamManager; + private final Counter deniedReleaseCounter; + private final AccessControlManager accessControlManager; + + public ReleaseOp(String stream, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + StreamManager streamManager, + Long checksum, + Feature checksumDisabledFeature, + AccessControlManager accessControlManager) { + super(stream, requestStat(statsLogger, "release"), checksum, checksumDisabledFeature); + StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + this.deniedReleaseCounter = streamOpStats.requestDeniedCounter("release"); + this.accessControlManager = accessControlManager; + this.streamManager = streamManager; + } + + @Override + protected Future<WriteResponse> executeOp(AsyncLogWriter writer, + Sequencer sequencer, + Object txnLock) { + Future<Void> result = streamManager.closeAndRemoveAsync(streamName()); + return result.map(new AbstractFunction1<Void, WriteResponse>() { + @Override + public WriteResponse apply(Void value) { + return ResponseUtils.writeSuccess(); + } + }); + } + + @Override + public void preExecute() throws DLException { + if (!accessControlManager.allowRelease(stream)) { + deniedReleaseCounter.inc(); + throw new RequestDeniedException(stream, "release"); + } + super.preExecute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java new file mode 100644 index 0000000..3517a63 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/Stream.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.service.streamset.Partition; +import com.twitter.util.Future; +import java.io.IOException; + +/** + * Stream is the per stream request handler in the DL service layer. + * + * <p>The collection of Streams in the proxy are managed by StreamManager. + */ +public interface Stream { + + /** + * Get the stream configuration for this stream. + * + * @return stream configuration + */ + DynamicDistributedLogConfiguration getStreamConfiguration(); + + /** + * Get the stream's last recorded current owner (may be out of date). Used + * as a hint for the client. + * @return last known owner for the stream + */ + String getOwner(); + + /** + * Get the stream name. + * @return stream name + */ + String getStreamName(); + + /** + * Get the represented partition name. + * + * @return represented partition name. + */ + Partition getPartition(); + + /** + * Expensive initialization code run after stream has been allocated in + * StreamManager. + * + * @throws IOException when encountered exception on initialization + */ + void initialize() throws IOException; + + /** + * Another initialize method (actually Thread.start). Should probably be + * moved to initialize(). + */ + void start(); + + /** + * Asynchronous close method. + * @param reason for closing + * @return future satisfied once close complete + */ + Future<Void> requestClose(String reason); + + /** + * Delete the stream from DL backend. + * + * @throws IOException when encountered exception on deleting the stream. + */ + void delete() throws IOException; + + /** + * Execute the stream operation against this stream. + * + * @param op operation to execute + */ + void submit(StreamOp op); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java new file mode 100644 index 0000000..845ef21 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; + +/** + * Factory to create a stream with provided stream configuration {@code streamConf}. + */ +public interface StreamFactory { + + /** + * Create a stream object. + * + * @param name stream name + * @param streamConf stream configuration + * @param streamManager manager of streams + * @return stream object + */ + Stream create(String name, + DynamicDistributedLogConfiguration streamConf, + StreamManager streamManager); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java new file mode 100644 index 0000000..2b90d55 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.stream; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.service.FatalErrorHandler; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.util.OrderedScheduler; +import com.twitter.util.Timer; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.jboss.netty.util.HashedWheelTimer; + +/** + * The implementation of {@link StreamFactory}. + */ +public class StreamFactoryImpl implements StreamFactory { + private final String clientId; + private final StreamOpStats streamOpStats; + private final ServerConfiguration serverConfig; + private final DistributedLogConfiguration dlConfig; + private final FeatureProvider featureProvider; + private final StreamConfigProvider streamConfigProvider; + private final StreamPartitionConverter streamPartitionConverter; + private final DistributedLogNamespace dlNamespace; + private final OrderedScheduler scheduler; + private final FatalErrorHandler fatalErrorHandler; + private final HashedWheelTimer requestTimer; + private final Timer futureTimer; + + public StreamFactoryImpl(String clientId, + StreamOpStats streamOpStats, + ServerConfiguration serverConfig, + DistributedLogConfiguration dlConfig, + FeatureProvider featureProvider, + StreamConfigProvider streamConfigProvider, + StreamPartitionConverter streamPartitionConverter, + DistributedLogNamespace dlNamespace, + OrderedScheduler scheduler, + FatalErrorHandler fatalErrorHandler, + HashedWheelTimer requestTimer) { + + this.clientId = clientId; + this.streamOpStats = streamOpStats; + this.serverConfig = serverConfig; + this.dlConfig = dlConfig; + this.featureProvider = featureProvider; + this.streamConfigProvider = streamConfigProvider; + this.streamPartitionConverter = streamPartitionConverter; + this.dlNamespace = dlNamespace; + this.scheduler = scheduler; + this.fatalErrorHandler = fatalErrorHandler; + this.requestTimer = requestTimer; + this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer); + } + + @Override + public Stream create(String name, + DynamicDistributedLogConfiguration streamConf, + StreamManager streamManager) { + return new StreamImpl(name, + streamPartitionConverter.convert(name), + clientId, + streamManager, + streamOpStats, + serverConfig, + dlConfig, + streamConf, + featureProvider, + streamConfigProvider, + dlNamespace, + scheduler, + fatalErrorHandler, + requestTimer, + futureTimer); + } +}