http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java new file mode 100644 index 0000000..ee17950 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * defines the utilities used across the project. + */ +package org.apache.distributedlog.util;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/resources/findbugsExclude.xml b/distributedlog-common/src/main/resources/findbugsExclude.xml new file mode 100644 index 0000000..ce2c176 --- /dev/null +++ b/distributedlog-common/src/main/resources/findbugsExclude.xml @@ -0,0 +1,32 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//--> +<FindBugsFilter> + <Match> + <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/> + <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.common.concurrent.FutureUtils$2"/> + <Bug pattern="NP_NONNULL_PARAM_VIOLATION" /> + </Match> + <Match> + <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/> + <Method name="Void" /> + <Bug pattern="NM_METHOD_NAMING_CONVENTION" /> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java new file mode 100644 index 0000000..ddfb7ae --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.common.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.LongStream; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.distributedlog.util.OrderedScheduler; +import org.junit.Test; + +/** + * Unit Test for {@link FutureUtils}. + */ +public class TestFutureUtils { + + /** + * Test Exception. + */ + static class TestException extends IOException { + private static final long serialVersionUID = -6256482498453846308L; + + public TestException() { + super("test-exception"); + } + } + + @Test + public void testComplete() throws Exception { + CompletableFuture<Long> future = FutureUtils.createFuture(); + FutureUtils.complete(future, 1024L); + assertEquals(1024L, FutureUtils.result(future).longValue()); + } + + @Test(expected = TestException.class) + public void testCompleteExceptionally() throws Exception { + CompletableFuture<Long> future = FutureUtils.createFuture(); + FutureUtils.completeExceptionally(future, new TestException()); + FutureUtils.result(future); + } + + @Test + public void testWhenCompleteAsync() throws Exception { + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("test-when-complete-async") + .corePoolSize(1) + .build(); + AtomicLong resultHolder = new AtomicLong(0L); + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture<Long> future = FutureUtils.createFuture(); + FutureUtils.whenCompleteAsync( + future, + (result, cause) -> { + resultHolder.set(result); + latch.countDown(); + }, + scheduler, + new Object()); + FutureUtils.complete(future, 1234L); + latch.await(); + assertEquals(1234L, resultHolder.get()); + } + + @Test + public void testProxyToSuccess() throws Exception { + CompletableFuture<Long> src = FutureUtils.createFuture(); + CompletableFuture<Long> target = FutureUtils.createFuture(); + FutureUtils.proxyTo(src, target); + FutureUtils.complete(src, 10L); + assertEquals(10L, FutureUtils.result(target).longValue()); + } + + @Test(expected = TestException.class) + public void testProxyToFailure() throws Exception { + CompletableFuture<Long> src = FutureUtils.createFuture(); + CompletableFuture<Long> target = FutureUtils.createFuture(); + FutureUtils.proxyTo(src, target); + FutureUtils.completeExceptionally(src, new TestException()); + FutureUtils.result(target); + } + + @Test + public void testVoid() throws Exception { + CompletableFuture<Void> voidFuture = FutureUtils.Void(); + assertTrue(voidFuture.isDone()); + assertFalse(voidFuture.isCompletedExceptionally()); + assertFalse(voidFuture.isCancelled()); + } + + @Test + public void testCollectEmptyList() throws Exception { + List<CompletableFuture<Integer>> futures = Lists.newArrayList(); + List<Integer> result = FutureUtils.result(FutureUtils.collect(futures)); + assertTrue(result.isEmpty()); + } + + @Test + public void testCollectTenItems() throws Exception { + List<CompletableFuture<Integer>> futures = Lists.newArrayList(); + List<Integer> expectedResults = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + futures.add(FutureUtils.value(i)); + expectedResults.add(i); + } + List<Integer> results = FutureUtils.result(FutureUtils.collect(futures)); + assertEquals(expectedResults, results); + } + + @Test(expected = TestException.class) + public void testCollectFailures() throws Exception { + List<CompletableFuture<Integer>> futures = Lists.newArrayList(); + List<Integer> expectedResults = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + if (i == 9) { + futures.add(FutureUtils.value(i)); + } else { + futures.add(FutureUtils.exception(new TestException())); + } + expectedResults.add(i); + } + FutureUtils.result(FutureUtils.collect(futures)); + } + + @Test + public void testWithinAlreadyDone() throws Exception { + OrderedScheduler scheduler = mock(OrderedScheduler.class); + CompletableFuture<Long> doneFuture = FutureUtils.value(1234L); + CompletableFuture<Long> withinFuture = FutureUtils.within( + doneFuture, + 10, + TimeUnit.MILLISECONDS, + new TestException(), + scheduler, + 1234L); + TimeUnit.MILLISECONDS.sleep(20); + assertTrue(withinFuture.isDone()); + assertFalse(withinFuture.isCancelled()); + assertFalse(withinFuture.isCompletedExceptionally()); + verify(scheduler, times(0)) + .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testWithinZeroTimeout() throws Exception { + OrderedScheduler scheduler = mock(OrderedScheduler.class); + CompletableFuture<Long> newFuture = FutureUtils.createFuture(); + CompletableFuture<Long> withinFuture = FutureUtils.within( + newFuture, + 0, + TimeUnit.MILLISECONDS, + new TestException(), + scheduler, + 1234L); + TimeUnit.MILLISECONDS.sleep(20); + assertFalse(withinFuture.isDone()); + assertFalse(withinFuture.isCancelled()); + assertFalse(withinFuture.isCompletedExceptionally()); + verify(scheduler, times(0)) + .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void testWithinCompleteBeforeTimeout() throws Exception { + OrderedScheduler scheduler = mock(OrderedScheduler.class); + ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class); + when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class))) + .thenAnswer(invocationOnMock -> scheduledFuture); + CompletableFuture<Long> newFuture = FutureUtils.createFuture(); + CompletableFuture<Long> withinFuture = FutureUtils.within( + newFuture, + Long.MAX_VALUE, + TimeUnit.MILLISECONDS, + new TestException(), + scheduler, + 1234L); + assertFalse(withinFuture.isDone()); + assertFalse(withinFuture.isCancelled()); + assertFalse(withinFuture.isCompletedExceptionally()); + + newFuture.complete(5678L); + + assertTrue(withinFuture.isDone()); + assertFalse(withinFuture.isCancelled()); + assertFalse(withinFuture.isCompletedExceptionally()); + assertEquals((Long) 5678L, FutureUtils.result(withinFuture)); + + verify(scheduledFuture, times(1)) + .cancel(eq(true)); + } + + @Test + public void testIgnoreSuccess() { + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture); + underlyFuture.complete(1234L); + assertTrue(ignoredFuture.isDone()); + assertFalse(ignoredFuture.isCompletedExceptionally()); + assertFalse(ignoredFuture.isCancelled()); + } + + @Test + public void testIgnoreFailure() { + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture); + underlyFuture.completeExceptionally(new TestException()); + assertTrue(ignoredFuture.isDone()); + assertFalse(ignoredFuture.isCompletedExceptionally()); + assertFalse(ignoredFuture.isCancelled()); + } + + @Test + public void testEnsureSuccess() throws Exception { + CountDownLatch ensureLatch = new CountDownLatch(1); + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> { + ensureLatch.countDown(); + }); + underlyFuture.complete(1234L); + FutureUtils.result(ensuredFuture); + assertTrue(ensuredFuture.isDone()); + assertFalse(ensuredFuture.isCompletedExceptionally()); + assertFalse(ensuredFuture.isCancelled()); + ensureLatch.await(); + } + + @Test + public void testEnsureFailure() throws Exception { + CountDownLatch ensureLatch = new CountDownLatch(1); + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> { + ensureLatch.countDown(); + }); + underlyFuture.completeExceptionally(new TestException()); + FutureUtils.result(FutureUtils.ignore(ensuredFuture)); + assertTrue(ensuredFuture.isDone()); + assertTrue(ensuredFuture.isCompletedExceptionally()); + assertFalse(ensuredFuture.isCancelled()); + ensureLatch.await(); + } + + @Test + public void testRescueSuccess() throws Exception { + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class); + CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc); + underlyFuture.complete(1234L); + FutureUtils.result(rescuedFuture); + assertTrue(rescuedFuture.isDone()); + assertFalse(rescuedFuture.isCompletedExceptionally()); + assertFalse(rescuedFuture.isCancelled()); + verify(rescueFuc, times(0)).apply(any(Throwable.class)); + } + + @Test + public void testRescueFailure() throws Exception { + CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L); + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue); + underlyFuture.completeExceptionally(new TestException()); + FutureUtils.result(rescuedFuture); + assertTrue(rescuedFuture.isDone()); + assertFalse(rescuedFuture.isCompletedExceptionally()); + assertFalse(rescuedFuture.isCancelled()); + assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture)); + } + + @Test + public void testStatsSuccess() throws Exception { + OpStatsLogger statsLogger = mock(OpStatsLogger.class); + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Long> statsFuture = FutureUtils.stats( + underlyFuture, + statsLogger, + Stopwatch.createStarted()); + underlyFuture.complete(1234L); + FutureUtils.result(statsFuture); + verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong()); + } + + @Test + public void testStatsFailure() throws Exception { + OpStatsLogger statsLogger = mock(OpStatsLogger.class); + CompletableFuture<Long> underlyFuture = FutureUtils.createFuture(); + CompletableFuture<Long> statsFuture = FutureUtils.stats( + underlyFuture, + statsLogger, + Stopwatch.createStarted()); + underlyFuture.completeExceptionally(new TestException()); + FutureUtils.result(FutureUtils.ignore(statsFuture)); + verify(statsLogger, times(1)).registerFailedEvent(anyLong()); + } + + @Test + public void testProcessListSuccess() throws Exception { + List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator()); + List<Long> expectedList = Lists.transform( + longList, + aLong -> 2 * aLong); + Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value); + CompletableFuture<List<Long>> totalFuture = FutureUtils.processList( + longList, + sumFunc, + null); + assertEquals(expectedList, FutureUtils.result(totalFuture)); + } + + @Test + public void testProcessEmptyList() throws Exception { + List<Long> longList = Lists.newArrayList(); + List<Long> expectedList = Lists.transform( + longList, + aLong -> 2 * aLong); + Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value); + CompletableFuture<List<Long>> totalFuture = FutureUtils.processList( + longList, + sumFunc, + null); + assertEquals(expectedList, FutureUtils.result(totalFuture)); + } + + @Test + public void testProcessListFailures() throws Exception { + List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator()); + AtomicLong total = new AtomicLong(0L); + Function<Long, CompletableFuture<Long>> sumFunc = value -> { + if (value < 5) { + total.addAndGet(value); + return FutureUtils.value(2 * value); + } else { + return FutureUtils.exception(new TestException()); + } + }; + CompletableFuture<List<Long>> totalFuture = FutureUtils.processList( + longList, + sumFunc, + null); + try { + FutureUtils.result(totalFuture); + fail("Should fail with TestException"); + } catch (TestException te) { + // as expected + } + assertEquals(10L, total.get()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java new file mode 100644 index 0000000..6b3ca58 --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writer to write properties to files. + */ +public class PropertiesWriter { + static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class); + + final FileOutputStream outputStream; + final File configFile; + final Properties properties; + + public PropertiesWriter() throws Exception { + this(null); + } + + public PropertiesWriter(File configFile) throws Exception { + if (null == configFile) { + this.configFile = File.createTempFile("temp", ".conf"); + } else { + this.configFile = configFile; + } + this.configFile.deleteOnExit(); + this.properties = new Properties(); + this.outputStream = new FileOutputStream(this.configFile); + } + + public void setProperty(String key, String value) { + properties.setProperty(key, value); + } + + public void removeProperty(String key) { + properties.remove(key); + } + + public void save() throws Exception { + FileOutputStream outputStream = new FileOutputStream(configFile); + properties.store(outputStream, null); + configFile.setLastModified(configFile.lastModified() + 1000); + LOG.debug("save modified={}", configFile.lastModified()); + } + + public File getFile() { + return configFile; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java new file mode 100644 index 0000000..a54faa0 --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Unit test of {@link ConcurrentBaseConfiguration}. + */ +public class TestConcurrentBaseConfiguration { + + @Test(timeout = 20000) + public void testBasicOperations() throws Exception { + ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration(); + conf.setProperty("prop1", "1"); + assertEquals(1, conf.getInt("prop1")); + conf.setProperty("prop1", "2"); + assertEquals(2, conf.getInt("prop1")); + conf.clearProperty("prop1"); + assertEquals(null, conf.getInteger("prop1", null)); + conf.setProperty("prop1", "1"); + conf.setProperty("prop2", "2"); + assertEquals(1, conf.getInt("prop1")); + assertEquals(2, conf.getInt("prop2")); + conf.clearProperty("prop1"); + assertEquals(null, conf.getInteger("prop1", null)); + assertEquals(2, conf.getInt("prop2")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java new file mode 100644 index 0000000..a474f89 --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.event.ConfigurationEvent; +import org.apache.commons.configuration.event.ConfigurationListener; +import org.jmock.lib.concurrent.DeterministicScheduler; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Notes: + * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to + * get picked up. + */ +public class TestConfigurationSubscription { + static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class); + + /** + * Give FileChangedReloadingStrategy some time to start reloading. + * + * <p>Make sure now!=lastChecked + * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} + */ + private void ensureConfigReloaded() throws InterruptedException { + // sleep 1 ms so that System.currentTimeMillis() != + // lastChecked (the time we construct FileChangedReloadingStrategy + Thread.sleep(1); + } + + @Test(timeout = 60000) + public void testReloadConfiguration() throws Exception { + PropertiesWriter writer = new PropertiesWriter(); + FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); + ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration()); + DeterministicScheduler executorService = new DeterministicScheduler(); + List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); + ConfigurationSubscription confSub = + new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS); + final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>(); + confSub.registerListener(new org.apache.distributedlog.common.config.ConfigurationListener() { + @Override + public void onReload(ConcurrentBaseConfiguration conf) { + confHolder.set(conf); + } + }); + assertEquals(null, conf.getProperty("prop1")); + + // add + writer.setProperty("prop1", "1"); + writer.save(); + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); + // reload the config + confSub.reload(); + assertNotNull(confHolder.get()); + assertTrue(conf == confHolder.get()); + assertEquals("1", conf.getProperty("prop1")); + } + + @Test(timeout = 60000) + public void testAddReloadBasicsConfig() throws Exception { + PropertiesWriter writer = new PropertiesWriter(); + DeterministicScheduler mockScheduler = new DeterministicScheduler(); + FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); + ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration()); + List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); + ConfigurationSubscription confSub = + new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); + assertEquals(null, conf.getProperty("prop1")); + + // add + writer.setProperty("prop1", "1"); + writer.save(); + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); + mockScheduler.tick(100, TimeUnit.MILLISECONDS); + assertEquals("1", conf.getProperty("prop1")); + + } + + @Test(timeout = 60000) + public void testInitialConfigLoad() throws Exception { + PropertiesWriter writer = new PropertiesWriter(); + writer.setProperty("prop1", "1"); + writer.setProperty("prop2", "abc"); + writer.setProperty("prop3", "123.0"); + writer.setProperty("prop4", "11132"); + writer.setProperty("prop5", "true"); + writer.save(); + + ScheduledExecutorService mockScheduler = new DeterministicScheduler(); + FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); + ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration()); + List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); + ConfigurationSubscription confSub = + new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); + assertEquals(1, conf.getInt("prop1")); + assertEquals("abc", conf.getString("prop2")); + assertEquals(123.0, conf.getFloat("prop3"), 0); + assertEquals(11132, conf.getInt("prop4")); + assertEquals(true, conf.getBoolean("prop5")); + } + + @Test(timeout = 60000) + public void testExceptionInConfigLoad() throws Exception { + PropertiesWriter writer = new PropertiesWriter(); + writer.setProperty("prop1", "1"); + writer.save(); + + DeterministicScheduler mockScheduler = new DeterministicScheduler(); + FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL()); + ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration()); + List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder); + ConfigurationSubscription confSub = + new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS); + + final AtomicInteger count = new AtomicInteger(1); + conf.addConfigurationListener(new ConfigurationListener() { + @Override + public void configurationChanged(ConfigurationEvent event) { + LOG.info("config changed {}", event); + // Throw after so we actually see the update anyway. + if (!event.isBeforeUpdate()) { + count.getAndIncrement(); + throw new RuntimeException("config listener threw and exception"); + } + } + }); + + int i = 0; + int initial = 0; + while (count.get() == initial) { + writer.setProperty("prop1", Integer.toString(i++)); + writer.save(); + mockScheduler.tick(100, TimeUnit.MILLISECONDS); + } + + initial = count.get(); + while (count.get() == initial) { + writer.setProperty("prop1", Integer.toString(i++)); + writer.save(); + mockScheduler.tick(100, TimeUnit.MILLISECONDS); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java new file mode 100644 index 0000000..7a981d1 --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.notification.Failure; + +/** + * Test Case for {@link TimedOutTestsListener}. + */ +public class TestTimedOutTestsListener { + + private static class Deadlock { + private CyclicBarrier barrier = new CyclicBarrier(6); + + public Deadlock() { + DeadlockThread[] dThreads = new DeadlockThread[6]; + + Monitor a = new Monitor("a"); + Monitor b = new Monitor("b"); + Monitor c = new Monitor("c"); + dThreads[0] = new DeadlockThread("MThread-1", a, b); + dThreads[1] = new DeadlockThread("MThread-2", b, c); + dThreads[2] = new DeadlockThread("MThread-3", c, a); + + Lock d = new ReentrantLock(); + Lock e = new ReentrantLock(); + Lock f = new ReentrantLock(); + + dThreads[3] = new DeadlockThread("SThread-4", d, e); + dThreads[4] = new DeadlockThread("SThread-5", e, f); + dThreads[5] = new DeadlockThread("SThread-6", f, d); + + // make them daemon threads so that the test will exit + for (int i = 0; i < 6; i++) { + dThreads[i].setDaemon(true); + dThreads[i].start(); + } + } + + class DeadlockThread extends Thread { + private Lock lock1 = null; + + private Lock lock2 = null; + + private Monitor mon1 = null; + + private Monitor mon2 = null; + + private boolean useSync; + + DeadlockThread(String name, Lock lock1, Lock lock2) { + super(name); + this.lock1 = lock1; + this.lock2 = lock2; + this.useSync = true; + } + + DeadlockThread(String name, Monitor mon1, Monitor mon2) { + super(name); + this.mon1 = mon1; + this.mon2 = mon2; + this.useSync = false; + } + + public void run() { + if (useSync) { + syncLock(); + } else { + monitorLock(); + } + } + + private void syncLock() { + lock1.lock(); + try { + try { + barrier.await(); + } catch (Exception e) { + } + goSyncDeadlock(); + } finally { + lock1.unlock(); + } + } + + private void goSyncDeadlock() { + try { + barrier.await(); + } catch (Exception e) { + } + lock2.lock(); + throw new RuntimeException("should not reach here."); + } + + private void monitorLock() { + synchronized (mon1) { + try { + barrier.await(); + } catch (Exception e) { + } + goMonitorDeadlock(); + } + } + + private void goMonitorDeadlock() { + try { + barrier.await(); + } catch (Exception e) { + } + synchronized (mon2) { + throw new RuntimeException(getName() + " should not reach here."); + } + } + } + + class Monitor { + String name; + + Monitor(String name) { + this.name = name; + } + } + + } + + @Test(timeout = 500) + public void testThreadDumpAndDeadlocks() throws Exception { + new Deadlock(); + String s = null; + while (true) { + s = TimedOutTestsListener.buildDeadlockInfo(); + if (s != null) { + break; + } + Thread.sleep(100); + } + + Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED")); + + Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX)); + StringWriter writer = new StringWriter(); + new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure); + String out = writer.toString(); + + Assert.assertTrue(out.contains("THREAD DUMP")); + Assert.assertTrue(out.contains("DEADLOCKS DETECTED")); + + System.out.println(out); + } + + private int countStringOccurrences(String s, String substr) { + int n = 0; + int index = 0; + while ((index = s.indexOf(substr, index) + 1) != 0) { + n++; + } + return n; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java new file mode 100644 index 0000000..c86cf8f --- /dev/null +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.junit.runner.notification.Failure; +import org.junit.runner.notification.RunListener; + +/** + * JUnit run listener which prints full thread dump into System.err in case a test is failed due to + * timeout. + */ +public class TimedOutTestsListener extends RunListener { + + static final String TEST_TIMED_OUT_PREFIX = "test timed out after"; + + private static String indent = " "; + + private final PrintWriter output; + + public TimedOutTestsListener() { + this.output = new PrintWriter(System.err); + } + + public TimedOutTestsListener(PrintWriter output) { + this.output = output; + } + + @Override + public void testFailure(Failure failure) throws Exception { + if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) { + output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <===="); + output.println(); + output.print(buildThreadDiagnosticString()); + } + } + + public static String buildThreadDiagnosticString() { + StringWriter sw = new StringWriter(); + PrintWriter output = new PrintWriter(sw); + + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); + output.println(String.format("Timestamp: %s", dateFormat.format(new Date()))); + output.println(); + output.println(buildThreadDump()); + + String deadlocksInfo = buildDeadlockInfo(); + if (deadlocksInfo != null) { + output.println("====> DEADLOCKS DETECTED <===="); + output.println(); + output.println(deadlocksInfo); + } + + return sw.toString(); + } + + static String buildThreadDump() { + StringBuilder dump = new StringBuilder(); + Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces(); + for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) { + Thread thread = e.getKey(); + dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(), + (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(), + Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()" + : StringUtils.lowerCase(thread.getState().name()), + Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState())); + for (StackTraceElement stackTraceElement : e.getValue()) { + dump.append("\n at "); + dump.append(stackTraceElement); + } + dump.append("\n"); + } + return dump.toString(); + } + + static String buildDeadlockInfo() { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] threadIds = threadBean.findMonitorDeadlockedThreads(); + if (threadIds != null && threadIds.length > 0) { + StringWriter stringWriter = new StringWriter(); + PrintWriter out = new PrintWriter(stringWriter); + + ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true); + for (ThreadInfo ti : infos) { + printThreadInfo(ti, out); + printLockInfo(ti.getLockedSynchronizers(), out); + out.println(); + } + + out.close(); + return stringWriter.toString(); + } else { + return null; + } + } + + private static void printThreadInfo(ThreadInfo ti, PrintWriter out) { + // print thread information + printThread(ti, out); + + // print stack trace with locks + StackTraceElement[] stacktrace = ti.getStackTrace(); + MonitorInfo[] monitors = ti.getLockedMonitors(); + for (int i = 0; i < stacktrace.length; i++) { + StackTraceElement ste = stacktrace[i]; + out.println(indent + "at " + ste.toString()); + for (MonitorInfo mi : monitors) { + if (mi.getLockedStackDepth() == i) { + out.println(indent + " - locked " + mi); + } + } + } + out.println(); + } + + private static void printThread(ThreadInfo ti, PrintWriter out) { + out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState()); + if (ti.getLockName() != null) { + out.print(" on lock=" + ti.getLockName()); + } + if (ti.isSuspended()) { + out.print(" (suspended)"); + } + if (ti.isInNative()) { + out.print(" (running in native)"); + } + out.println(); + if (ti.getLockOwnerName() != null) { + out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId()); + } + } + + private static void printLockInfo(LockInfo[] locks, PrintWriter out) { + out.println(indent + "Locked synchronizers: count = " + locks.length); + for (LockInfo li : locks) { + out.println(indent + " - " + li); + } + out.println(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml new file mode 100644 index 0000000..41116b9 --- /dev/null +++ b/distributedlog-core-twitter/pom.xml @@ -0,0 +1,141 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog</artifactId> + <version>0.5.0-incubating-SNAPSHOT</version> + </parent> + <artifactId>distributedlog-core-twitter</artifactId> + <name>Apache DistributedLog :: Core Library (Twitter Future Interface)</name> + <dependencies> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>util-core_2.11</artifactId> + <version>${finagle.version}</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-common</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-compiler-plugin.version}</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${maven-jar-plugin.version}</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine> + <forkMode>always</forkMode> + <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds> + <properties> + <property> + <name>listener</name> + <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value> + </property> + </properties> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>${maven-checkstyle-plugin.version}</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>${puppycrawl.checkstyle.version}</version> + </dependency> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-build-tools</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <configuration> + <configLocation>distributedlog/checkstyle.xml</configLocation> + <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeResources>false</includeResources> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java new file mode 100644 index 0000000..4ec1dfa --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.twitter.util.Future; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A log reader to read records in asynchronous way. + */ +public interface AsyncLogReader { + + /** + * Get stream name that the reader reads from. + * + * @return stream name. + */ + String getStreamName(); + + /** + * Read the next record from the log stream. + * + * @return A promise that when satisfied will contain the Log Record with its DLSN. + */ + Future<LogRecordWithDLSN> readNext(); + + /** + * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list + * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort + * call. + * + * @param numEntries + * num entries + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + Future<List<LogRecordWithDLSN>> readBulk(int numEntries); + + /** + * Read next <i>numEntries</i> entries in a given <i>waitTime</i>. + * + * <p>The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>. + * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would + * wait until new entries are available. + * + * @param numEntries + * max entries to return + * @param waitTime + * maximum wait time if there are entries already for read + * @param timeUnit + * wait time unit + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit); + + /** + * Closes this source and releases any system resources associated + * with it. If the source is already closed then invoking this + * method has no effect. + * + * @return future representing the close result. + */ + Future<Void> asyncClose(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java new file mode 100644 index 0000000..4f4a90e --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.apache.distributedlog.util.FutureUtils.newTFuture; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.util.Future; +import java.util.List; +import java.util.concurrent.TimeUnit; +import scala.Function1; +import scala.runtime.AbstractFunction1; + +/** + * Implementation wrapper of {@link org.apache.distributedlog.api.AsyncLogReader}. + */ +class AsyncLogReaderImpl implements AsyncLogReader { + + static final Function1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader> MAP_FUNC = + new AbstractFunction1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader>() { + @Override + public AsyncLogReader apply(org.apache.distributedlog.api.AsyncLogReader reader) { + return new AsyncLogReaderImpl(reader); + } + }; + + private final org.apache.distributedlog.api.AsyncLogReader impl; + + AsyncLogReaderImpl(org.apache.distributedlog.api.AsyncLogReader impl) { + this.impl = impl; + } + + @VisibleForTesting + org.apache.distributedlog.api.AsyncLogReader getImpl() { + return impl; + } + + @Override + public String getStreamName() { + return impl.getStreamName(); + } + + @Override + public Future<LogRecordWithDLSN> readNext() { + return newTFuture(impl.readNext()); + } + + @Override + public Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { + return newTFuture(impl.readBulk(numEntries)); + } + + @Override + public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit) { + return newTFuture(impl.readBulk(numEntries, waitTime, timeUnit)); + } + + @Override + public Future<Void> asyncClose() { + return newTFuture(impl.asyncClose()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java new file mode 100644 index 0000000..915877c --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.twitter.util.Future; +import java.util.List; + +/** + * A writer that appends log records asynchronously. + */ +public interface AsyncLogWriter { + + /** + * Get the last committed transaction id. + * + * @return last committed transaction id. + */ + long getLastTxId(); + + /** + * Write a log record to the stream. + * + * @param record single log record + * @return A Future which contains a DLSN if the record was successfully written + * or an exception if the write fails + */ + Future<DLSN> write(LogRecord record); + + /** + * Write log records to the stream in bulk. Each future in the list represents the result of + * one write operation. The size of the result list is equal to the size of the input list. + * Buffers are written in order, and the list of result futures has the same order. + * + * @param record set of log records + * @return A Future which contains a list of Future DLSNs if the record was successfully written + * or an exception if the operation fails. + */ + Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record); + + /** + * Truncate the log until <i>dlsn</i>. + * + * @param dlsn + * dlsn to truncate until. + * @return A Future indicates whether the operation succeeds or not, or an exception + * if the truncation fails. + */ + Future<Boolean> truncate(DLSN dlsn); + + /** + * Get the name of the stream this writer writes data to. + */ + String getStreamName(); + + /** + * Closes this source and releases any system resources associated + * with it. If the source is already closed then invoking this + * method has no effect. + * + * @return future representing the close result. + */ + Future<Void> asyncClose(); + + /** + * Aborts the object and releases any resources associated with it. + * If the object is already aborted then invoking this method has no + * effect. + * + * @return future represents the abort result + */ + Future<Void> asyncAbort(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java new file mode 100644 index 0000000..dc28bb1 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.apache.distributedlog.util.FutureUtils.newTFuture; +import static org.apache.distributedlog.util.FutureUtils.newTFutureList; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.util.Future; +import java.util.List; +import scala.Function1; +import scala.runtime.AbstractFunction1; + +/** + * The implementation of {@link AsyncLogWriter} built over {@link org.apache.distributedlog.api.AsyncLogWriter}. + */ +class AsyncLogWriterImpl implements AsyncLogWriter { + + static final Function1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter> MAP_FUNC = + new AbstractFunction1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter>() { + @Override + public AsyncLogWriter apply(org.apache.distributedlog.api.AsyncLogWriter writer) { + return new AsyncLogWriterImpl(writer); + } + }; + + private final org.apache.distributedlog.api.AsyncLogWriter impl; + + AsyncLogWriterImpl(org.apache.distributedlog.api.AsyncLogWriter impl) { + this.impl = impl; + } + + @VisibleForTesting + org.apache.distributedlog.api.AsyncLogWriter getImpl() { + return impl; + } + + @Override + public long getLastTxId() { + return impl.getLastTxId(); + } + + @Override + public Future<DLSN> write(LogRecord record) { + return newTFuture(impl.write(record)); + } + + @Override + public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record) { + return newTFutureList(impl.writeBulk(record)); + } + + @Override + public Future<Boolean> truncate(DLSN dlsn) { + return newTFuture(impl.truncate(dlsn)); + } + + @Override + public String getStreamName() { + return impl.getStreamName(); + } + + @Override + public Future<Void> asyncClose() { + return newTFuture(impl.asyncClose()); + } + + @Override + public Future<Void> asyncAbort() { + return newTFuture(impl.asyncAbort()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java new file mode 100644 index 0000000..14f05c3 --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.twitter.util.Future; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.subscription.SubscriptionsStore; + +/** + * A DistributedLogManager is responsible for managing a single place of storing + * edit logs. It may correspond to multiple files, a backup node, etc. + * Even when the actual underlying storage is rolled, or failed and restored, + * each conceptual place of storage corresponds to exactly one instance of + * this class, which is created when the EditLog is first opened. + */ +public interface DistributedLogManager extends Closeable { + + /** + * Get the name of the stream managed by this log manager. + * + * @return streamName + */ + String getStreamName(); + + /** + * Get the namespace driver used by this manager. + * + * @return the namespace driver + */ + NamespaceDriver getNamespaceDriver(); + + /** + * Get log segments. + * + * @return log segments + * @throws IOException + */ + List<LogSegmentMetadata> getLogSegments() throws IOException; + + /** + * Register <i>listener</i> on log segment updates of this stream. + * + * @param listener + * listener to receive update log segment list. + */ + void registerListener(LogSegmentListener listener) throws IOException; + + /** + * Unregister <i>listener</i> on log segment updates from this stream. + * + * @param listener + * listener to receive update log segment list. + */ + void unregisterListener(LogSegmentListener listener); + + /** + * Open async log writer to write records to the log stream. + * + * @return result represents the open result + */ + Future<AsyncLogWriter> openAsyncLogWriter(); + + /** + * Begin writing to the log stream identified by the name. + * + * @return the writer interface to generate log records + */ + LogWriter startLogSegmentNonPartitioned() throws IOException; + + /** + * Begin writing to the log stream identified by the name. + * + * @return the writer interface to generate log records + */ + // @Deprecated + AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; + + /** + * Begin appending to the end of the log stream which is being treated as a sequence of bytes. + * + * @return the writer interface to generate log records + */ + AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; + + /** + * Get a reader to read a log stream as a sequence of bytes. + * + * @return the writer interface to generate log records + */ + AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; + + /** + * Get the input stream starting with fromTxnId for the specified log. + * + * @param fromTxnId - the first transaction id we want to read + * @return the stream starting with transaction fromTxnId + * @throws IOException if a stream cannot be found. + */ + LogReader getInputStream(long fromTxnId) + throws IOException; + + LogReader getInputStream(DLSN fromDLSN) throws IOException; + + /** + * Open an async log reader to read records from a log starting from <code>fromTxnId</code>. + * + * @param fromTxnId + * transaction id to start reading from + * @return async log reader + */ + Future<AsyncLogReader> openAsyncLogReader(long fromTxnId); + + /** + * Open an async log reader to read records from a log starting from <code>fromDLSN</code>. + * + * @param fromDLSN + * dlsn to start reading from + * @return async log reader + */ + Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN); + + // @Deprecated + AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; + + // @Deprecated + AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; + + Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN); + + /** + * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>. + * If two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param fromDLSN + * start dlsn + * @param subscriberId + * subscriber id + * @return async log reader + */ + Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId); + + /** + * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from + * its last commit position recorded in subscription store. If no last commit position found + * in subscription store, it would start reading from head of the stream. + * + * <p>If the two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param subscriberId + * subscriber id + * @return async log reader + */ + Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId); + + /** + * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>. + * + * @param transactionId + * transaction id + * @return dlsn of first log record whose transaction id is not less than transactionId. + */ + Future<DLSN> getDLSNNotLessThanTxId(long transactionId); + + /** + * Get the last log record in the stream. + * + * @return the last log record in the stream + * @throws IOException if a stream cannot be found. + */ + LogRecordWithDLSN getLastLogRecord() + throws IOException; + + /** + * Get the earliest Transaction Id available in the log. + * + * @return earliest transaction id + * @throws IOException + */ + long getFirstTxId() throws IOException; + + /** + * Get Latest Transaction Id in the log. + * + * @return latest transaction id + * @throws IOException + */ + long getLastTxId() throws IOException; + + /** + * Get Latest DLSN in the log. + * + * @return last dlsn + * @throws IOException + */ + DLSN getLastDLSN() throws IOException; + + /** + * Get Latest log record with DLSN in the log - async. + * + * @return latest log record with DLSN + */ + Future<LogRecordWithDLSN> getLastLogRecordAsync(); + + /** + * Get Latest Transaction Id in the log - async. + * + * @return latest transaction id + */ + Future<Long> getLastTxIdAsync(); + + /** + * Get first DLSN in the log. + * + * @return first dlsn in the stream + */ + Future<DLSN> getFirstDLSNAsync(); + + /** + * Get Latest DLSN in the log - async. + * + * @return latest transaction id + */ + Future<DLSN> getLastDLSNAsync(); + + /** + * Get the number of log records in the active portion of the log. + * + * <p>Any log segments that have already been truncated will not be included. + * + * @return number of log records + * @throws IOException + */ + long getLogRecordCount() throws IOException; + + /** + * Get the number of log records in the active portion of the log - async. + * + * <p>Any log segments that have already been truncated will not be included + * + * @return future number of log records + * @throws IOException + */ + Future<Long> getLogRecordCountAsync(final DLSN beginDLSN); + + /** + * Run recovery on the log. + * + * @throws IOException + */ + void recover() throws IOException; + + /** + * Check if an end of stream marker was added to the stream + * A stream with an end of stream marker cannot be appended to. + * + * @return true if the marker was added to the stream, false otherwise + * @throws IOException + */ + boolean isEndOfStreamMarked() throws IOException; + + /** + * Delete the log. + * + * @throws IOException if the deletion fails + */ + void delete() throws IOException; + + /** + * The DistributedLogManager may archive/purge any logs for transactionId + * less than or equal to minImageTxId. + * This is to be used only when the client explicitly manages deletion. If + * the cleanup policy is based on sliding time window, then this method need + * not be called. + * + * @param minTxIdToKeep the earliest txid that must be retained + * @throws IOException if purging fails + */ + void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; + + /** + * Get the subscriptions store provided by the distributedlog manager. + * + * @return subscriptions store manages subscriptions for current stream. + */ + SubscriptionsStore getSubscriptionsStore(); + + /** + * Closes this source and releases any system resources associated + * with it. If the source is already closed then invoking this + * method has no effect. + * + * @return future representing the close result. + */ + Future<Void> asyncClose(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java new file mode 100644 index 0000000..aa3e94e --- /dev/null +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.apache.distributedlog.util.FutureUtils.newTFuture; + +import com.twitter.util.Future; +import java.io.IOException; +import java.util.List; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.subscription.SubscriptionsStore; + +/** + * The wrapper of {@link org.apache.distributedlog.api.DistributedLogManager}. + */ +public class DistributedLogManagerImpl implements DistributedLogManager { + + private final org.apache.distributedlog.api.DistributedLogManager impl; + + public DistributedLogManagerImpl(org.apache.distributedlog.api.DistributedLogManager impl) { + this.impl = impl; + } + + @Override + public String getStreamName() { + return impl.getStreamName(); + } + + @Override + public NamespaceDriver getNamespaceDriver() { + return impl.getNamespaceDriver(); + } + + @Override + public List<LogSegmentMetadata> getLogSegments() throws IOException { + return impl.getLogSegments(); + } + + @Override + public void registerListener(LogSegmentListener listener) throws IOException { + impl.registerListener(listener); + } + + @Override + public void unregisterListener(LogSegmentListener listener) { + impl.unregisterListener(listener); + } + + @Override + public Future<AsyncLogWriter> openAsyncLogWriter() { + return newTFuture(impl.openAsyncLogWriter()).map(AsyncLogWriterImpl.MAP_FUNC); + } + + @Override + public LogWriter startLogSegmentNonPartitioned() throws IOException { + return new LogWriterImpl(impl.startLogSegmentNonPartitioned()); + } + + @Override + public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { + return new AsyncLogWriterImpl(impl.startAsyncLogSegmentNonPartitioned()); + } + + @Override + public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException { + return impl.getAppendOnlyStreamWriter(); + } + + @Override + public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException { + return impl.getAppendOnlyStreamReader(); + } + + @Override + public LogReader getInputStream(long fromTxnId) throws IOException { + return new LogReaderImpl(impl.getInputStream(fromTxnId)); + } + + @Override + public LogReader getInputStream(DLSN fromDLSN) throws IOException { + return new LogReaderImpl(impl.getInputStream(fromDLSN)); + } + + @Override + public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) { + return newTFuture(impl.openAsyncLogReader(fromTxnId)).map(AsyncLogReaderImpl.MAP_FUNC); + } + + @Override + public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { + return newTFuture(impl.openAsyncLogReader(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC); + } + + @Override + public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException { + return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromTxnId)); + } + + @Override + public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException { + return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromDLSN)); + } + + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN) { + return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC); + } + + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId) { + return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN, subscriberId)) + .map(AsyncLogReaderImpl.MAP_FUNC); + } + + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) { + return newTFuture(impl.getAsyncLogReaderWithLock(subscriberId)) + .map(AsyncLogReaderImpl.MAP_FUNC); + } + + @Override + public Future<DLSN> getDLSNNotLessThanTxId(long transactionId) { + return newTFuture(impl.getDLSNNotLessThanTxId(transactionId)); + } + + @Override + public LogRecordWithDLSN getLastLogRecord() throws IOException { + return impl.getLastLogRecord(); + } + + @Override + public long getFirstTxId() throws IOException { + return impl.getFirstTxId(); + } + + @Override + public long getLastTxId() throws IOException { + return impl.getLastTxId(); + } + + @Override + public DLSN getLastDLSN() throws IOException { + return impl.getLastDLSN(); + } + + @Override + public Future<LogRecordWithDLSN> getLastLogRecordAsync() { + return newTFuture(impl.getLastLogRecordAsync()); + } + + @Override + public Future<Long> getLastTxIdAsync() { + return newTFuture(impl.getLastTxIdAsync()); + } + + @Override + public Future<DLSN> getFirstDLSNAsync() { + return newTFuture(impl.getFirstDLSNAsync()); + } + + @Override + public Future<DLSN> getLastDLSNAsync() { + return newTFuture(impl.getLastDLSNAsync()); + } + + @Override + public long getLogRecordCount() throws IOException { + return impl.getLogRecordCount(); + } + + @Override + public Future<Long> getLogRecordCountAsync(DLSN beginDLSN) { + return newTFuture(impl.getLogRecordCountAsync(beginDLSN)); + } + + @Override + public void recover() throws IOException { + impl.recover(); + } + + @Override + public boolean isEndOfStreamMarked() throws IOException { + return impl.isEndOfStreamMarked(); + } + + @Override + public void delete() throws IOException { + impl.delete(); + } + + @Override + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { + impl.purgeLogsOlderThan(minTxIdToKeep); + } + + @Override + public SubscriptionsStore getSubscriptionsStore() { + return new SubscriptionsStoreImpl(impl.getSubscriptionsStore()); + } + + @Override + public void close() throws IOException { + impl.close(); + } + + @Override + public Future<Void> asyncClose() { + return newTFuture(impl.asyncClose()); + } +}