http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstRecordSelector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstRecordSelector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstRecordSelector.java deleted file mode 100644 index a51210f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstRecordSelector.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.selector; - -import com.twitter.distributedlog.LogRecordWithDLSN; - -/** - * Save the first record processed - */ -public class FirstRecordSelector implements LogRecordSelector { - - final boolean includeControl; - LogRecordWithDLSN firstRecord; - - public FirstRecordSelector(boolean includeControl) { - this.includeControl = includeControl; - } - - @Override - public void process(LogRecordWithDLSN record) { - if (null == this.firstRecord - && (includeControl || !record.isControl())) { - this.firstRecord = record; - } - } - - @Override - public LogRecordWithDLSN result() { - return this.firstRecord; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstTxIdNotLessThanSelector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstTxIdNotLessThanSelector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstTxIdNotLessThanSelector.java deleted file mode 100644 index 03c2cbb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/FirstTxIdNotLessThanSelector.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.selector; - -import com.twitter.distributedlog.LogRecordWithDLSN; - -/** - * Save the first record with transaction id not less than the provided transaction id. - * If all records' transaction id is less than provided transaction id, save the last record. - */ -public class FirstTxIdNotLessThanSelector implements LogRecordSelector { - - LogRecordWithDLSN result; - final long txId; - boolean found = false; - - public FirstTxIdNotLessThanSelector(long txId) { - this.txId = txId; - } - - @Override - public void process(LogRecordWithDLSN record) { - if (found) { - return; - } - this.result = record; - if (record.getTransactionId() >= txId) { - found = true; - } - } - - @Override - public LogRecordWithDLSN result() { - return this.result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LastRecordSelector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LastRecordSelector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LastRecordSelector.java deleted file mode 100644 index 191342c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LastRecordSelector.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.selector; - -import com.twitter.distributedlog.LogRecordWithDLSN; - -/** - * Save the last record processed. - */ -public class LastRecordSelector implements LogRecordSelector { - - LogRecordWithDLSN lastRecord; - - @Override - public void process(LogRecordWithDLSN record) { - lastRecord = record; - } - - @Override - public LogRecordWithDLSN result() { - return lastRecord; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LogRecordSelector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LogRecordSelector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LogRecordSelector.java deleted file mode 100644 index 45d1c49..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/selector/LogRecordSelector.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.selector; - -import com.twitter.distributedlog.LogRecordWithDLSN; - -/** - * Visitor interface to process a set of records, and return some result. - */ -public interface LogRecordSelector { - /** - * Process a given <code>record</code>. - * - * @param record - * log record to process - */ - void process(LogRecordWithDLSN record); - - /** - * Returned the selected log record after processing a set of records. - * - * @return the selected log record. - */ - LogRecordWithDLSN result(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BKExceptionStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BKExceptionStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BKExceptionStatsLogger.java deleted file mode 100644 index be71aef..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BKExceptionStatsLogger.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.stats; - -import org.apache.bookkeeper.client.BKException.Code; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; - -import java.util.HashMap; -import java.util.Map; - -/** - * A Util to logger stats on bk exceptions. - */ -public class BKExceptionStatsLogger { - - public static String getMessage(int code) { - switch (code) { - case Code.OK: - return "OK"; - case Code.ReadException: - return "ReadException"; - case Code.QuorumException: - return "QuorumException"; - case Code.NoBookieAvailableException: - return "NoBookieAvailableException"; - case Code.DigestNotInitializedException: - return "DigestNotInitializedException"; - case Code.DigestMatchException: - return "DigestMatchException"; - case Code.NotEnoughBookiesException: - return "NotEnoughBookiesException"; - case Code.NoSuchLedgerExistsException: - return "NoSuchLedgerExistsException"; - case Code.BookieHandleNotAvailableException: - return "BookieHandleNotAvailableException"; - case Code.ZKException: - return "ZKException"; - case Code.LedgerRecoveryException: - return "LedgerRecoveryException"; - case Code.LedgerClosedException: - return "LedgerClosedException"; - case Code.WriteException: - return "WriteException"; - case Code.NoSuchEntryException: - return "NoSuchEntryException"; - case Code.IncorrectParameterException: - return "IncorrectParameterException"; - case Code.InterruptedException: - return "InterruptedException"; - case Code.ProtocolVersionException: - return "ProtocolVersionException"; - case Code.MetadataVersionException: - return "MetadataVersionException"; - case Code.LedgerFencedException: - return "LedgerFencedException"; - case Code.UnauthorizedAccessException: - return "UnauthorizedAccessException"; - case Code.UnclosedFragmentException: - return "UnclosedFragmentException"; - case Code.WriteOnReadOnlyBookieException: - return "WriteOnReadOnlyBookieException"; - case Code.IllegalOpException: - return "IllegalOpException"; - default: - return "UnexpectedException"; - } - } - - private final StatsLogger parentLogger; - private final Map<Integer, Counter> exceptionCounters; - - public BKExceptionStatsLogger(StatsLogger parentLogger) { - this.parentLogger = parentLogger; - this.exceptionCounters = new HashMap<Integer, Counter>(); - } - - public Counter getExceptionCounter(int rc) { - Counter counter = exceptionCounters.get(rc); - if (null != counter) { - return counter; - } - // TODO: it would be better to have BKException.Code.get(rc) - synchronized (exceptionCounters) { - counter = exceptionCounters.get(rc); - if (null != counter) { - return counter; - } - counter = parentLogger.getCounter(getMessage(rc)); - exceptionCounters.put(rc, counter); - } - return counter; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java deleted file mode 100644 index 10a7011..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.stats; - -import com.google.common.base.Preconditions; - -import org.apache.bookkeeper.stats.CachingStatsLogger; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsData; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Stats Loggers that broadcast stats to multiple {@link StatsLogger}. - */ -public class BroadCastStatsLogger { - - /** - * Create a broadcast stats logger of two stats loggers `<code>first</code>` and - * `<code>second</code>`. The returned stats logger doesn't allow registering any - * {@link Gauge}. - * - * @param first - * first stats logger - * @param second - * second stats logger - * @return broadcast stats logger - */ - public static StatsLogger two(StatsLogger first, StatsLogger second) { - return new CachingStatsLogger(new Two(first, second)); - } - - static class Two implements StatsLogger { - protected final StatsLogger first; - protected final StatsLogger second; - - private Two(StatsLogger first, StatsLogger second) { - super(); - Preconditions.checkNotNull(first); - Preconditions.checkNotNull(second); - this.first = first; - this.second = second; - } - - @Override - public OpStatsLogger getOpStatsLogger(final String statName) { - final OpStatsLogger firstLogger = first.getOpStatsLogger(statName); - final OpStatsLogger secondLogger = second.getOpStatsLogger(statName); - return new OpStatsLogger() { - @Override - public void registerFailedEvent(long l) { - firstLogger.registerFailedEvent(l); - secondLogger.registerFailedEvent(l); - } - - @Override - public void registerSuccessfulEvent(long l) { - firstLogger.registerSuccessfulEvent(l); - secondLogger.registerSuccessfulEvent(l); - } - - @Override - public OpStatsData toOpStatsData() { - // Eventually consistent. - return firstLogger.toOpStatsData(); - } - - @Override - public void clear() { - firstLogger.clear(); - secondLogger.clear(); - } - }; - } - - @Override - public Counter getCounter(final String statName) { - final Counter firstCounter = first.getCounter(statName); - final Counter secondCounter = second.getCounter(statName); - return new Counter() { - @Override - public void clear() { - firstCounter.clear(); - secondCounter.clear(); - } - - @Override - public void inc() { - firstCounter.inc(); - secondCounter.inc(); - } - - @Override - public void dec() { - firstCounter.dec(); - secondCounter.dec(); - } - - @Override - public void add(long l) { - firstCounter.add(l); - secondCounter.add(l); - } - - @Override - public Long get() { - // Eventually consistent. - return firstCounter.get(); - } - }; - } - - @Override - public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { - // Different underlying stats loggers have different semantics wrt. gauge registration. - throw new RuntimeException("Cannot register a gauge on BroadCastStatsLogger.Two"); - } - - @Override - public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { - // no-op - } - - @Override - public StatsLogger scope(final String scope) { - return new Two(first.scope(scope), second.scope(scope)); - } - - @Override - public void removeScope(String scope, StatsLogger statsLogger) { - if (!(statsLogger instanceof Two)) { - return; - } - - Two another = (Two) statsLogger; - - first.removeScope(scope, another.first); - second.removeScope(scope, another.second); - } - } - - /** - * Create a broadcast stats logger of two stats loggers <code>master</code> and <code>slave</code>. - * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows registering {@link Gauge}s. - * The {@link Gauge} will be registered under master. - * - * @param master - * master stats logger to receive {@link Counter}, {@link OpStatsLogger} and {@link Gauge}. - * @param slave - * slave stats logger to receive only {@link Counter} and {@link OpStatsLogger}. - * @return broadcast stats logger - */ - public static StatsLogger masterslave(StatsLogger master, StatsLogger slave) { - return new CachingStatsLogger(new MasterSlave(master, slave)); - } - - static class MasterSlave extends Two { - - private MasterSlave(StatsLogger master, StatsLogger slave) { - super(master, slave); - } - - @Override - public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { - first.registerGauge(statName, gauge); - } - - @Override - public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { - first.unregisterGauge(statName, gauge); - } - - @Override - public StatsLogger scope(String scope) { - return new MasterSlave(first.scope(scope), second.scope(scope)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/OpStatsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/OpStatsListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/OpStatsListener.java deleted file mode 100644 index 0432706..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/OpStatsListener.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.stats; - -import com.google.common.base.Stopwatch; -import com.twitter.util.FutureEventListener; -import org.apache.bookkeeper.stats.OpStatsLogger; -import java.util.concurrent.TimeUnit; - -public class OpStatsListener<T> implements FutureEventListener<T> { - OpStatsLogger opStatsLogger; - Stopwatch stopwatch; - - public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) { - this.opStatsLogger = opStatsLogger; - if (null == stopwatch) { - this.stopwatch = Stopwatch.createStarted(); - } else { - this.stopwatch = stopwatch; - } - } - - public OpStatsListener(OpStatsLogger opStatsLogger) { - this(opStatsLogger, null); - } - - @Override - public void onSuccess(T value) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionStateStore.java deleted file mode 100644 index 9e4c4f2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionStateStore.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.subscription; - -import java.io.Closeable; - -import scala.runtime.BoxedUnit; - -import com.twitter.distributedlog.DLSN; -import com.twitter.util.Future; - -public interface SubscriptionStateStore extends Closeable { - /** - * Get the last committed position stored for this subscription - * - * @return future represents the last commit position - */ - public Future<DLSN> getLastCommitPosition(); - - /** - * Advances the position associated with the subscriber - * - * @param newPosition - new commit position - * @return future represents the advance result - */ - public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java deleted file mode 100644 index 27d5c1d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/subscription/SubscriptionsStore.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.subscription; - -import com.twitter.distributedlog.DLSN; -import com.twitter.util.Future; -import scala.runtime.BoxedUnit; - -import java.io.Closeable; -import java.util.Map; - -/** - * Store to manage subscriptions - */ -public interface SubscriptionsStore extends Closeable { - - /** - * Get the last committed position stored for <i>subscriberId</i>. - * - * @param subscriberId - * subscriber id - * @return future representing last committed position. - */ - public Future<DLSN> getLastCommitPosition(String subscriberId); - - /** - * Get the last committed positions for all subscribers. - * - * @return future representing last committed positions for all subscribers. - */ - public Future<Map<String, DLSN>> getLastCommitPositions(); - - /** - * Advance the last committed position for <i>subscriberId</i>. - * - * @param subscriberId - * subscriber id. - * @param newPosition - * new committed position. - * @return future representing advancing result. - */ - public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition); - - /** - * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the - * data stored under this subscriber will be lost. - * @param subscriberId subscriber id - * @return future represent success or failure. - * return true only if there's such subscriber and we removed it successfully. - * return false if there's no such subscriber, or we failed to remove. - */ - public Future<Boolean> deleteSubscriber(String subscriberId); - -}