http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java new file mode 100644 index 0000000..bed516d --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.api.LogReader; +import org.apache.distributedlog.api.LogWriter; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.util.FutureUtils; +import org.junit.Test; + +/** + * Unit test of {@link DistributedLogManagerImpl}. + */ +public class TestDistributedLogManagerImpl { + + private final org.apache.distributedlog.api.DistributedLogManager impl = + mock(org.apache.distributedlog.api.DistributedLogManager.class); + private final DistributedLogManagerImpl manager = new DistributedLogManagerImpl(impl); + + @Test + public void testGetStreamName() throws Exception { + String name = "test-get-stream-name"; + when(impl.getStreamName()).thenReturn(name); + assertEquals(name, manager.getStreamName()); + verify(impl, times(1)).getStreamName(); + } + + @Test + public void testGetNamespaceDriver() throws Exception { + NamespaceDriver driver = mock(NamespaceDriver.class); + when(impl.getNamespaceDriver()).thenReturn(driver); + assertEquals(driver, manager.getNamespaceDriver()); + verify(impl, times(1)).getNamespaceDriver(); + } + + @Test + public void testGetLogSegments() throws Exception { + List<LogSegmentMetadata> segments = mock(List.class); + when(impl.getLogSegments()).thenReturn(segments); + assertEquals(segments, manager.getLogSegments()); + verify(impl, times(1)).getLogSegments(); + } + + @Test + public void testRegisterListener() throws Exception { + LogSegmentListener listener = mock(LogSegmentListener.class); + manager.registerListener(listener); + verify(impl, times(1)).registerListener(listener); + } + + @Test + public void testUnregisterListener() throws Exception { + LogSegmentListener listener = mock(LogSegmentListener.class); + manager.unregisterListener(listener); + verify(impl, times(1)).unregisterListener(listener); + } + + @Test + public void testOpenAsyncLogWriter() throws Exception { + AsyncLogWriter writer = mock(AsyncLogWriter.class); + when(impl.openAsyncLogWriter()).thenReturn(CompletableFuture.completedFuture(writer)); + assertEquals(writer, ((AsyncLogWriterImpl) FutureUtils.result(manager.openAsyncLogWriter())).getImpl()); + verify(impl, times(1)).openAsyncLogWriter(); + } + + @Test + public void testStartLogSegmentNonPartitioned() throws Exception { + LogWriter writer = mock(LogWriter.class); + when(impl.startLogSegmentNonPartitioned()).thenReturn(writer); + assertEquals(writer, ((LogWriterImpl) manager.startLogSegmentNonPartitioned()).getImpl()); + verify(impl, times(1)).startLogSegmentNonPartitioned(); + } + + @Test + public void testStartAsyncLogSegmentNonPartitioned() throws Exception { + AsyncLogWriter writer = mock(AsyncLogWriter.class); + when(impl.startAsyncLogSegmentNonPartitioned()).thenReturn(writer); + assertEquals(writer, ((AsyncLogWriterImpl) manager.startAsyncLogSegmentNonPartitioned()).getImpl()); + verify(impl, times(1)).startAsyncLogSegmentNonPartitioned(); + } + + @Test + public void testGetAppendOnlyStreamWriter() throws Exception { + AppendOnlyStreamWriter writer = mock(AppendOnlyStreamWriter.class); + when(impl.getAppendOnlyStreamWriter()).thenReturn(writer); + assertEquals(writer, manager.getAppendOnlyStreamWriter()); + verify(impl, times(1)).getAppendOnlyStreamWriter(); + } + + @Test + public void testGetAppendOnlyStreamReader() throws Exception { + AppendOnlyStreamReader writer = mock(AppendOnlyStreamReader.class); + when(impl.getAppendOnlyStreamReader()).thenReturn(writer); + assertEquals(writer, manager.getAppendOnlyStreamReader()); + verify(impl, times(1)).getAppendOnlyStreamReader(); + } + + @Test + public void testGetInputStream() throws Exception { + LogReader reader = mock(LogReader.class); + when(impl.getInputStream(anyLong())).thenReturn(reader); + assertEquals(reader, ((LogReaderImpl) manager.getInputStream(1234L)).getImpl()); + verify(impl, times(1)).getInputStream(eq(1234L)); + } + + @Test + public void testGetInputStream2() throws Exception { + DLSN dlsn = mock(DLSN.class); + LogReader reader = mock(LogReader.class); + when(impl.getInputStream(eq(dlsn))).thenReturn(reader); + assertEquals(reader, ((LogReaderImpl) manager.getInputStream(dlsn)).getImpl()); + verify(impl, times(1)).getInputStream(eq(dlsn)); + } + + @Test + public void testOpenAsyncLogReader() throws Exception { + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.openAsyncLogReader(eq(1234L))).thenReturn(CompletableFuture.completedFuture(reader)); + assertEquals(reader, + ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(1234L))).getImpl()); + verify(impl, times(1)).openAsyncLogReader(eq(1234L)); + } + + @Test + public void testOpenAsyncLogReader2() throws Exception { + DLSN dlsn = mock(DLSN.class); + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.openAsyncLogReader(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader)); + assertEquals(reader, + ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(dlsn))).getImpl()); + verify(impl, times(1)).openAsyncLogReader(eq(dlsn)); + } + + @Test + public void testGetAsyncLogReader() throws Exception { + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.getAsyncLogReader(eq(1234L))).thenReturn(reader); + assertEquals(reader, + ((AsyncLogReaderImpl) manager.getAsyncLogReader(1234L)).getImpl()); + verify(impl, times(1)).getAsyncLogReader(eq(1234L)); + } + + @Test + public void testGetAsyncLogReader2() throws Exception { + DLSN dlsn = mock(DLSN.class); + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.getAsyncLogReader(eq(dlsn))).thenReturn(reader); + assertEquals(reader, + ((AsyncLogReaderImpl) manager.getAsyncLogReader(dlsn)).getImpl()); + verify(impl, times(1)).getAsyncLogReader(eq(dlsn)); + } + + @Test + public void testOpenAsyncLogReaderWithLock() throws Exception { + DLSN dlsn = mock(DLSN.class); + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.getAsyncLogReaderWithLock(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader)); + assertEquals(reader, + ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn))).getImpl()); + verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn)); + } + + @Test + public void testOpenAsyncLogReaderWithLock2() throws Exception { + String subscriberId = "test-subscriber"; + DLSN dlsn = mock(DLSN.class); + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId))) + .thenReturn(CompletableFuture.completedFuture(reader)); + assertEquals(reader, + ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn, subscriberId))).getImpl()); + verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId)); + } + + @Test + public void testOpenAsyncLogReaderWithLock3() throws Exception { + String subscriberId = "test-subscriber"; + org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class); + when(impl.getAsyncLogReaderWithLock(eq(subscriberId))) + .thenReturn(CompletableFuture.completedFuture(reader)); + assertEquals(reader, + ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(subscriberId))).getImpl()); + verify(impl, times(1)).getAsyncLogReaderWithLock(eq(subscriberId)); + } + + @Test + public void testGetDLSNNotLessThanTxId() throws Exception { + DLSN dlsn = mock(DLSN.class); + when(impl.getDLSNNotLessThanTxId(anyLong())).thenReturn(CompletableFuture.completedFuture(dlsn)); + assertEquals(dlsn, FutureUtils.result(manager.getDLSNNotLessThanTxId(1234L))); + verify(impl, times(1)).getDLSNNotLessThanTxId(eq(1234L)); + } + + @Test + public void testGetLastLogRecord() throws Exception { + LogRecordWithDLSN record = mock(LogRecordWithDLSN.class); + when(impl.getLastLogRecord()).thenReturn(record); + assertEquals(record, manager.getLastLogRecord()); + verify(impl, times(1)).getLastLogRecord(); + } + + @Test + public void testFirstTxId() throws Exception { + long txId = System.currentTimeMillis(); + when(impl.getFirstTxId()).thenReturn(txId); + assertEquals(txId, manager.getFirstTxId()); + verify(impl, times(1)).getFirstTxId(); + } + + @Test + public void testLastTxId() throws Exception { + long txId = System.currentTimeMillis(); + when(impl.getLastTxId()).thenReturn(txId); + assertEquals(txId, manager.getLastTxId()); + verify(impl, times(1)).getLastTxId(); + } + + @Test + public void testLastDLSN() throws Exception { + DLSN dlsn = mock(DLSN.class); + when(impl.getLastDLSN()).thenReturn(dlsn); + assertEquals(dlsn, manager.getLastDLSN()); + verify(impl, times(1)).getLastDLSN(); + } + + @Test + public void testGetLastLogRecordAsync() throws Exception { + LogRecordWithDLSN record = mock(LogRecordWithDLSN.class); + when(impl.getLastLogRecordAsync()).thenReturn(CompletableFuture.completedFuture(record)); + assertEquals(record, FutureUtils.result(manager.getLastLogRecordAsync())); + verify(impl, times(1)).getLastLogRecordAsync(); + } + + @Test + public void testLastTxIdAsync() throws Exception { + long txId = System.currentTimeMillis(); + when(impl.getLastTxIdAsync()).thenReturn(CompletableFuture.completedFuture(txId)); + assertEquals(txId, FutureUtils.result(manager.getLastTxIdAsync()).longValue()); + verify(impl, times(1)).getLastTxIdAsync(); + } + + @Test + public void testLastDLSNAsync() throws Exception { + DLSN dlsn = mock(DLSN.class); + when(impl.getLastDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn)); + assertEquals(dlsn, FutureUtils.result(manager.getLastDLSNAsync())); + verify(impl, times(1)).getLastDLSNAsync(); + } + + @Test + public void testFirstDLSNAsync() throws Exception { + DLSN dlsn = mock(DLSN.class); + when(impl.getFirstDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn)); + assertEquals(dlsn, FutureUtils.result(manager.getFirstDLSNAsync())); + verify(impl, times(1)).getFirstDLSNAsync(); + } + + @Test + public void testGetLogRecordCount() throws Exception { + long count = System.currentTimeMillis(); + when(impl.getLogRecordCount()).thenReturn(count); + assertEquals(count, manager.getLogRecordCount()); + verify(impl, times(1)).getLogRecordCount(); + } + + @Test + public void testGetLogRecordCountAsync() throws Exception { + DLSN dlsn = mock(DLSN.class); + long count = System.currentTimeMillis(); + when(impl.getLogRecordCountAsync(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(count)); + assertEquals(count, FutureUtils.result(manager.getLogRecordCountAsync(dlsn)).longValue()); + verify(impl, times(1)).getLogRecordCountAsync(eq(dlsn)); + } + + @Test + public void testRecover() throws Exception { + manager.recover(); + verify(impl, times(1)).recover(); + } + + @Test + public void testIsEndOfStreamMarked() throws Exception { + when(impl.isEndOfStreamMarked()).thenReturn(true); + assertTrue(manager.isEndOfStreamMarked()); + verify(impl, times(1)).isEndOfStreamMarked(); + } + + @Test + public void testDelete() throws Exception { + manager.delete(); + verify(impl, times(1)).delete(); + } + + @Test + public void testPurgeLogsOlderThan() throws Exception { + long minTxIdToKeep = System.currentTimeMillis(); + manager.purgeLogsOlderThan(minTxIdToKeep); + verify(impl, times(1)).purgeLogsOlderThan(eq(minTxIdToKeep)); + } + + @Test + public void testGetSubscriptionsStore() throws Exception { + SubscriptionsStore ss = mock(SubscriptionsStore.class); + when(impl.getSubscriptionsStore()).thenReturn(ss); + assertEquals(ss, ((SubscriptionsStoreImpl) manager.getSubscriptionsStore()).getImpl()); + verify(impl, times(1)).getSubscriptionsStore(); + } + + @Test + public void testClose() throws Exception { + manager.close(); + verify(impl, times(1)).close(); + } + + @Test + public void testAsyncClose() throws Exception { + when(impl.asyncClose()).thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(manager.asyncClose()); + verify(impl, times(1)).asyncClose(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java new file mode 100644 index 0000000..4adc386 --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.util.FutureUtils; +import org.junit.Test; + +/** + * Unit test of {@link LogReaderImpl}. + */ +public class TestLogReaderImpl { + + private final org.apache.distributedlog.api.LogReader underlying = + mock(org.apache.distributedlog.api.LogReader.class); + private final LogReaderImpl reader = new LogReaderImpl(underlying); + + @Test + public void testReadNext() throws Exception { + reader.readNext(false); + verify(underlying, times(1)).readNext(eq(false)); + } + + @Test + public void testReadBulk() throws Exception { + reader.readBulk(false, 100); + verify(underlying, times(1)).readBulk(eq(false), eq(100)); + } + + @Test + public void testClose() throws Exception { + reader.close(); + verify(underlying, times(1)).close(); + } + + @Test + public void testAsyncClose() throws Exception { + when(underlying.asyncClose()) + .thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(reader.asyncClose()); + verify(underlying, times(1)).asyncClose(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java new file mode 100644 index 0000000..be69260 --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.List; +import org.junit.Test; + +/** + * Unit test of {@link LogWriterImpl}. + */ +public class TestLogWriterImpl { + + private final org.apache.distributedlog.api.LogWriter underlying = + mock(org.apache.distributedlog.api.LogWriter.class); + private final LogWriterImpl writer = new LogWriterImpl(underlying); + + @Test + public void testWrite() throws Exception { + LogRecord record = mock(LogRecord.class); + writer.write(record); + verify(underlying, times(1)).write(eq(record)); + } + + @Test + public void testWriteBulk() throws Exception { + List<LogRecord> records = mock(List.class); + writer.writeBulk(records); + verify(underlying, times(1)).writeBulk(eq(records)); + } + + @Test + public void testSetReadyToFlush() throws Exception { + writer.setReadyToFlush(); + verify(underlying, times(1)).setReadyToFlush(); + } + + @Test + public void testFlushAndSync() throws Exception { + writer.flushAndSync(); + verify(underlying, times(1)).flushAndSync(); + } + + @Test + public void testMarkEndOfStream() throws Exception { + writer.markEndOfStream(); + verify(underlying, times(1)).markEndOfStream(); + } + + @Test + public void testClose() throws Exception { + writer.close(); + verify(underlying, times(1)).close(); + } + + @Test + public void testAbort() throws Exception { + writer.abort(); + verify(underlying, times(1)).abort(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java new file mode 100644 index 0000000..e6573aa --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; +import org.apache.distributedlog.util.FutureUtils; +import org.junit.Test; + +/** + * Unit test of {@link SubscriptionsStoreImpl}. + */ +public class TestSubscriptionStoreImpl { + + private final SubscriptionsStore underlying = mock(SubscriptionsStore.class); + private final SubscriptionsStoreImpl store = new SubscriptionsStoreImpl(underlying); + + @Test + public void testGetLastCommitPosition() throws Exception { + String subscriber = "test-subscriber"; + DLSN dlsn = mock(DLSN.class); + when(underlying.getLastCommitPosition(anyString())) + .thenReturn(CompletableFuture.completedFuture(dlsn)); + assertEquals(dlsn, + FutureUtils.result(store.getLastCommitPosition(subscriber))); + verify(underlying, times(1)).getLastCommitPosition(eq(subscriber)); + } + + @Test + public void testGetLastCommitPositions() throws Exception { + Map<String, DLSN> positions = mock(Map.class); + when(underlying.getLastCommitPositions()) + .thenReturn(CompletableFuture.completedFuture(positions)); + assertEquals(positions, FutureUtils.result(store.getLastCommitPositions())); + verify(underlying, times(1)).getLastCommitPositions(); + } + + @Test + public void testAdvanceCommmitPosition() throws Exception { + String subscriber = "test-subscriber"; + DLSN dlsn = mock(DLSN.class); + when(underlying.advanceCommitPosition(anyString(), any(DLSN.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + FutureUtils.result(store.advanceCommitPosition(subscriber, dlsn)); + verify(underlying, times(1)) + .advanceCommitPosition(eq(subscriber), eq(dlsn)); + } + + @Test + public void testDeleteSubscriber() throws Exception { + String subscriber = "test-subscriber"; + when(underlying.deleteSubscriber(anyString())) + .thenReturn(CompletableFuture.completedFuture(true)); + assertTrue(FutureUtils.result(store.deleteSubscriber(subscriber))); + verify(underlying, times(1)).deleteSubscriber(eq(subscriber)); + } + + @Test + public void testClose() throws Exception { + store.close(); + verify(underlying, times(1)).close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java new file mode 100644 index 0000000..78dcb2a --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.namespace; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.net.URI; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.junit.Test; + +/** + * Unit test of {@link DistributedLogNamespaceBuilder}. + */ +public class TestDistributedLogNamespaceBuilder { + + private final NamespaceBuilder underlying = mock(NamespaceBuilder.class); + private final DistributedLogNamespaceBuilder builder = new DistributedLogNamespaceBuilder(underlying); + + @Test + public void testConf() { + DistributedLogConfiguration conf = mock(DistributedLogConfiguration.class); + builder.conf(conf); + verify(underlying, times(1)).conf(eq(conf)); + } + + @Test + public void testDynConf() { + DynamicDistributedLogConfiguration conf = mock(DynamicDistributedLogConfiguration.class); + builder.dynConf(conf); + verify(underlying, times(1)).dynConf(eq(conf)); + } + + @Test + public void testUri() { + URI uri = URI.create("distributedlog://127.0.0.1/messaging/distributedlog"); + builder.uri(uri); + verify(underlying, times(1)).uri(eq(uri)); + } + + @Test + public void testStatsLogger() { + StatsLogger statsLogger = mock(StatsLogger.class); + builder.statsLogger(statsLogger); + verify(underlying, times(1)).statsLogger(eq(statsLogger)); + } + + @Test + public void testPerLogStatsLogger() { + StatsLogger statsLogger = mock(StatsLogger.class); + builder.perLogStatsLogger(statsLogger); + verify(underlying, times(1)).perLogStatsLogger(eq(statsLogger)); + } + + @Test + public void testFeatureProvider() { + FeatureProvider provider = mock(FeatureProvider.class); + builder.featureProvider(provider); + verify(underlying, times(1)).featureProvider(eq(provider)); + } + + @Test + public void testClientId() { + String clientId = "test-client-id"; + builder.clientId(clientId); + verify(underlying, times(1)).clientId(eq(clientId)); + } + + @Test + public void testRegionId() { + int regionId = 1234; + builder.regionId(regionId); + verify(underlying, times(1)).regionId(eq(regionId)); + } + + @Test + public void testBuild() throws Exception { + builder.build(); + verify(underlying, times(1)).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java new file mode 100644 index 0000000..b562fe4 --- /dev/null +++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.namespace; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.base.Optional; +import java.util.Iterator; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.callback.NamespaceListener; +import org.junit.Test; + +/** + * Unit test of {@link DistributedLogNamespaceImpl}. + */ +public class TestDistributedLogNamespaceImpl { + + private final Namespace impl = mock(Namespace.class); + private final DistributedLogNamespaceImpl namespace = new DistributedLogNamespaceImpl(impl); + + @Test + public void testGetNamespaceDriver() { + NamespaceDriver driver = mock(NamespaceDriver.class); + when(impl.getNamespaceDriver()).thenReturn(driver); + assertEquals(driver, namespace.getNamespaceDriver()); + verify(impl, times(1)).getNamespaceDriver(); + } + + @Test + public void testCreateLog() throws Exception { + String logName = "test-log-name"; + namespace.createLog(logName); + verify(impl, times(1)).createLog(eq(logName)); + } + + @Test + public void testDeleteLog() throws Exception { + String logName = "test-log-name"; + namespace.deleteLog(logName); + verify(impl, times(1)).deleteLog(eq(logName)); + } + + @Test + public void testOpenLog() throws Exception { + String logName = "test-open-log"; + namespace.openLog(logName); + verify(impl, times(1)).openLog(eq(logName)); + } + + @Test + public void testOpenLog2() throws Exception { + String logName = "test-open-log"; + namespace.openLog(logName, Optional.absent(), Optional.absent(), Optional.absent()); + verify(impl, times(1)) + .openLog(eq(logName), eq(Optional.absent()), eq(Optional.absent()), eq(Optional.absent())); + } + + @Test + public void testLogExists() throws Exception { + String logName = "test-log-exists"; + when(impl.logExists(anyString())).thenReturn(true); + assertTrue(namespace.logExists(logName)); + verify(impl, times(1)).logExists(eq(logName)); + } + + @Test + public void testGetLogs() throws Exception { + Iterator<String> logs = mock(Iterator.class); + when(impl.getLogs()).thenReturn(logs); + assertEquals(logs, namespace.getLogs()); + verify(impl, times(1)).getLogs(); + } + + @Test + public void testRegisterNamespaceListener() throws Exception { + NamespaceListener listener = mock(NamespaceListener.class); + namespace.registerNamespaceListener(listener); + verify(impl, times(1)).registerNamespaceListener(eq(listener)); + } + + @Test + public void testCreateAccessControlManager() throws Exception { + AccessControlManager manager = mock(AccessControlManager.class); + when(impl.createAccessControlManager()).thenReturn(manager); + assertEquals(manager, namespace.createAccessControlManager()); + verify(impl, times(1)).createAccessControlManager(); + } + + @Test + public void testClose() { + namespace.close(); + verify(impl, times(1)).close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties index 930db8d..af1cf5f 100644 --- a/distributedlog-core/conf/log4j.properties +++ b/distributedlog-core/conf/log4j.properties @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false -log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false log4j.appender.Executors=org.apache.log4j.RollingFileAppender http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml index a4f7568..1ad51c2 100644 --- a/distributedlog-core/pom.xml +++ b/distributedlog-core/pom.xml @@ -26,6 +26,16 @@ <name>Apache DistributedLog :: Core Library</name> <dependencies> <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.distributedlog</groupId> + <artifactId>distributedlog-protocol</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> @@ -41,50 +51,6 @@ </exclusions> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jmock</groupId> - <artifactId>jmock</artifactId> - <version>${jmock.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - <dependency> - <groupId>com.twitter.common</groupId> - <artifactId>stats-util</artifactId> - <version>${stats-util.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>util-core_2.11</artifactId> - <version>${finagle.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> - <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.3</version> @@ -114,19 +80,28 @@ </exclusions> </dependency> <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>${commons-cli.version}</version> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.distributedlog</groupId> - <artifactId>distributedlog-protocol</artifactId> - <version>${project.parent.version}</version> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> </dependency> <dependency> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> - <version>${lz4.version}</version> + <groupId>org.jmock</groupId> + <artifactId>jmock</artifactId> + <version>${jmock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> @@ -136,7 +111,7 @@ </dependency> <dependency> <groupId>org.apache.distributedlog</groupId> - <artifactId>distributedlog-protocol</artifactId> + <artifactId>distributedlog-common</artifactId> <version>${project.parent.version}</version> <type>test-jar</type> <scope>test</scope> @@ -172,7 +147,7 @@ <properties> <property> <name>listener</name> - <value>org.apache.distributedlog.TimedOutTestsListener</value> + <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value> </property> </properties> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java index 1d96f0e..3a31907 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java @@ -18,10 +18,10 @@ package org.apache.distributedlog; import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; - +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java index 8278c68..dde78c2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java @@ -17,14 +17,12 @@ */ package org.apache.distributedlog; -import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; import java.io.Closeable; import java.io.IOException; - +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,16 +42,16 @@ public class AppendOnlyStreamWriter implements Closeable { this.requestPos = pos; } - public Future<DLSN> write(byte[] data) { + public CompletableFuture<DLSN> write(byte[] data) { requestPos += data.length; - Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data)); - return writeResult.addEventListener(new WriteCompleteListener(requestPos)); + CompletableFuture<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data)); + return writeResult.whenComplete(new WriteCompleteListener(requestPos)); } public void force(boolean metadata) throws IOException { long pos = 0; try { - pos = Await.result(logWriter.flushAndCommit()); + pos = FutureUtils.result(logWriter.flushAndCommit()); } catch (IOException ioe) { throw ioe; } catch (Exception ex) { @@ -78,7 +76,7 @@ public class AppendOnlyStreamWriter implements Closeable { public void markEndOfStream() throws IOException { try { - Await.result(logWriter.markEndOfStream()); + FutureUtils.result(logWriter.markEndOfStream()); } catch (IOException ioe) { throw ioe; } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java deleted file mode 100644 index e3ace05..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public interface AsyncLogReader extends AsyncCloseable { - - /** - * Get stream name that the reader reads from. - * - * @return stream name. - */ - public String getStreamName(); - - /** - * Read the next record from the log stream - * - * @return A promise that when satisfied will contain the Log Record with its DLSN. - */ - public Future<LogRecordWithDLSN> readNext(); - - /** - * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list - * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort - * call. - * - * @param numEntries - * num entries - * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. - */ - public Future<List<LogRecordWithDLSN>> readBulk(int numEntries); - - /** - * Read next <i>numEntries</i> entries in a given <i>waitTime</i>. - * <p> - * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>. - * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would - * wait until new entries are available. - * - * @param numEntries - * max entries to return - * @param waitTime - * maximum wait time if there are entries already for read - * @param timeUnit - * wait time unit - * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. - */ - public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java deleted file mode 100644 index 53b393b..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.io.AsyncAbortable; -import org.apache.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.io.Closeable; -import java.util.List; - -public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { - - /** - * Get the last committed transaction id. - * - * @return last committed transaction id. - */ - public long getLastTxId(); - - /** - * Write a log record to the stream. - * - * @param record single log record - * @return A Future which contains a DLSN if the record was successfully written - * or an exception if the write fails - */ - public Future<DLSN> write(LogRecord record); - - /** - * Write log records to the stream in bulk. Each future in the list represents the result of - * one write operation. The size of the result list is equal to the size of the input list. - * Buffers are written in order, and the list of result futures has the same order. - * - * @param record set of log records - * @return A Future which contains a list of Future DLSNs if the record was successfully written - * or an exception if the operation fails. - */ - public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record); - - /** - * Truncate the log until <i>dlsn</i>. - * - * @param dlsn - * dlsn to truncate until. - * @return A Future indicates whether the operation succeeds or not, or an exception - * if the truncation fails. - */ - public Future<Boolean> truncate(DLSN dlsn); - - /** - * Get the name of the stream this writer writes data to - */ - public String getStreamName(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java index c12bd10..367bb50 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog; -public interface AsyncNotification { +interface AsyncNotification { /** * Triggered when the background activity encounters an exception * http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java index 4a2ef30..8a0bffb 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java @@ -18,6 +18,9 @@ package org.apache.distributedlog; import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.AlreadyClosedException; import org.apache.distributedlog.exceptions.LockingException; @@ -27,18 +30,12 @@ import org.apache.distributedlog.io.Abortable; import org.apache.distributedlog.io.Abortables; import org.apache.distributedlog.io.AsyncAbortable; import org.apache.distributedlog.io.AsyncCloseable; -import org.apache.distributedlog.util.FutureUtils; -import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.common.util.PermitManager; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.Closeable; import java.io.IOException; @@ -53,18 +50,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab protected final BKDistributedLogManager bkDistributedLogManager; // States - private Promise<Void> closePromise = null; + private CompletableFuture<Void> closePromise = null; private volatile boolean forceRolling = false; private boolean forceRecovery = false; // Truncation Related - private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null; + private CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = null; @VisibleForTesting private Long minTimestampToKeepOverride = null; // Log Segment Writers protected BKLogSegmentWriter segmentWriter = null; - protected Future<BKLogSegmentWriter> segmentWriterFuture = null; + protected CompletableFuture<BKLogSegmentWriter> segmentWriterFuture = null; protected BKLogSegmentWriter allocatedSegmentWriter = null; protected BKLogWriteHandler writeHandler = null; @@ -100,7 +97,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab // This code path will be executed when the handler is not set or has been closed // due to forceRecovery during testing BKLogWriteHandler newHandler = - FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false)); + Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false)); boolean success = false; try { synchronized (this) { @@ -123,13 +120,13 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab return segmentWriter; } - protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() { + protected synchronized CompletableFuture<BKLogSegmentWriter> getCachedLogWriterFuture() { return segmentWriterFuture; } protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) { this.segmentWriter = logWriter; - this.segmentWriterFuture = Future.value(logWriter); + this.segmentWriterFuture = FutureUtils.value(logWriter); } protected synchronized BKLogSegmentWriter removeCachedLogWriter() { @@ -157,12 +154,12 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab } } - private Future<Void> asyncCloseAndComplete(boolean shouldThrow) { + private CompletableFuture<Void> asyncCloseAndComplete(boolean shouldThrow) { BKLogSegmentWriter segmentWriter = getCachedLogWriter(); BKLogWriteHandler writeHandler = getCachedWriteHandler(); if (null != segmentWriter && null != writeHandler) { cancelTruncation(); - Promise<Void> completePromise = new Promise<Void>(); + CompletableFuture<Void> completePromise = new CompletableFuture<Void>(); asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow); return completePromise; } else { @@ -172,10 +169,10 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter, final BKLogWriteHandler writeHandler, - final Promise<Void> completePromise, + final CompletableFuture<Void> completePromise, final boolean shouldThrow) { writeHandler.completeAndCloseLogSegment(segmentWriter) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + .whenComplete(new FutureEventListener<LogSegmentMetadata>() { @Override public void onSuccess(LogSegmentMetadata segment) { removeCachedLogWriter(); @@ -189,15 +186,11 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab } private void complete(final Throwable cause) { - closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - if (null != cause && shouldThrow) { - FutureUtils.setException(completePromise, cause); - } else { - FutureUtils.setValue(completePromise, null); - } - return BoxedUnit.UNIT; + FutureUtils.ensure(closeNoThrow(), () -> { + if (null != cause && shouldThrow) { + FutureUtils.completeExceptionally(completePromise, cause); + } else { + FutureUtils.complete(completePromise, null); } }); } @@ -206,63 +199,67 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab @VisibleForTesting void closeAndComplete() throws IOException { - FutureUtils.result(asyncCloseAndComplete(true)); + Utils.ioResult(asyncCloseAndComplete(true)); } - protected Future<Void> asyncCloseAndComplete() { + protected CompletableFuture<Void> asyncCloseAndComplete() { return asyncCloseAndComplete(true); } @Override public void close() throws IOException { - FutureUtils.result(asyncClose()); + Utils.ioResult(asyncClose()); } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return asyncCloseAndComplete(false); } /** * Close the writer and release all the underlying resources */ - protected Future<Void> closeNoThrow() { - Promise<Void> closeFuture; + protected CompletableFuture<Void> closeNoThrow() { + CompletableFuture<Void> closeFuture; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } cancelTruncation(); - Utils.closeSequence(bkDistributedLogManager.getScheduler(), - true, /** ignore close errors **/ - getCachedLogWriter(), - getAllocatedLogWriter(), - getCachedWriteHandler() - ).proxyTo(closeFuture); + FutureUtils.proxyTo( + Utils.closeSequence(bkDistributedLogManager.getScheduler(), + true, /** ignore close errors **/ + getCachedLogWriter(), + getAllocatedLogWriter(), + getCachedWriteHandler() + ), + closeFuture); return closeFuture; } @Override public void abort() throws IOException { - FutureUtils.result(asyncAbort()); + Utils.ioResult(asyncAbort()); } @Override - public Future<Void> asyncAbort() { - Promise<Void> closeFuture; + public CompletableFuture<Void> asyncAbort() { + CompletableFuture<Void> closeFuture; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } cancelTruncation(); - Abortables.abortSequence(bkDistributedLogManager.getScheduler(), - getCachedLogWriter(), - getAllocatedLogWriter(), - getCachedWriteHandler()).proxyTo(closeFuture); + FutureUtils.proxyTo( + Abortables.abortSequence(bkDistributedLogManager.getScheduler(), + getCachedLogWriter(), + getAllocatedLogWriter(), + getCachedWriteHandler()), + closeFuture); return closeFuture; } @@ -270,22 +267,22 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab protected BKLogSegmentWriter getLedgerWriter(final long startTxId, final boolean allowMaxTxID) throws IOException { - Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true); + CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true); BKLogSegmentWriter logSegmentWriter = null; if (null != logSegmentWriterFuture) { - logSegmentWriter = FutureUtils.result(logSegmentWriterFuture); + logSegmentWriter = Utils.ioResult(logSegmentWriterFuture); } if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) { - logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary( + logSegmentWriter = Utils.ioResult(rollLogSegmentIfNecessary( logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID)); } return logSegmentWriter; } // used by async writer - synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) { + synchronized protected CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) { final BKLogSegmentWriter ledgerWriter = getCachedLogWriter(); - Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture(); + CompletableFuture<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture(); if (null == ledgerWriterFuture || null == ledgerWriter) { return null; } @@ -293,38 +290,38 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab // Handle the case where the last call to write actually caused an error in the log if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) { // Close the ledger writer so that we will recover and start a new log segment - Future<Void> closeFuture; + CompletableFuture<Void> closeFuture; if (ledgerWriter.isLogSegmentInError()) { closeFuture = ledgerWriter.asyncAbort(); } else { closeFuture = ledgerWriter.asyncClose(); } - return closeFuture.flatMap( - new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() { + return closeFuture.thenCompose( + new Function<Void, CompletionStage<BKLogSegmentWriter>>() { @Override - public Future<BKLogSegmentWriter> apply(Void result) { + public CompletableFuture<BKLogSegmentWriter> apply(Void result) { removeCachedLogWriter(); if (ledgerWriter.isLogSegmentInError()) { - return Future.value(null); + return FutureUtils.value(null); } BKLogWriteHandler writeHandler; try { writeHandler = getWriteHandler(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } if (null != writeHandler && forceRecovery) { return writeHandler.completeAndCloseLogSegment(ledgerWriter) - .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() { + .thenApply(new Function<LogSegmentMetadata, BKLogSegmentWriter>() { @Override public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) { return null; } }); } else { - return Future.value(null); + return FutureUtils.value(null); } } }); @@ -357,32 +354,25 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab // skip scheduling if there is task that's already running // synchronized (this) { - if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { + if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDone())) { lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); } } } - private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, + private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, final long startTxId, final boolean allowMaxTxID) { return writeHandler.recoverIncompleteLogSegments() - .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(Long lastTxId) { - return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID) - .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() { - @Override - public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) { + .thenCompose( + lastTxId -> writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID) + .thenApply(newSegmentWriter -> { cacheLogWriter(newSegmentWriter); - return BoxedUnit.UNIT; - } - }); - } - }); + return newSegmentWriter; + })); } - private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit( + private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit( final BKLogSegmentWriter oldSegmentWriter, final BKLogWriteHandler writeHandler, final long startTxId, @@ -390,47 +380,46 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab final boolean allowMaxTxID) { final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit(); if (switchPermit.isAllowed()) { - return closeOldLogSegmentAndStartNewOne( - oldSegmentWriter, - writeHandler, - startTxId, - bestEffort, - allowMaxTxID - ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(Throwable cause) { - if (cause instanceof LockingException) { - LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ", - writeHandler.getFullyQualifiedName(), cause); - bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); - return Future.value(oldSegmentWriter); - } else if (cause instanceof ZKException) { - ZKException zke = (ZKException) cause; - if (ZKException.isRetryableZKException(zke)) { - LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." + - " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(), - zke.getKeeperExceptionCode()); + return FutureUtils.ensure( + FutureUtils.rescue( + closeOldLogSegmentAndStartNewOne( + oldSegmentWriter, + writeHandler, + startTxId, + bestEffort, + allowMaxTxID + ), + // rescue function + cause -> { + if (cause instanceof LockingException) { + LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ", + writeHandler.getFullyQualifiedName(), cause); bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); - return Future.value(oldSegmentWriter); + return FutureUtils.value(oldSegmentWriter); + } else if (cause instanceof ZKException) { + ZKException zke = (ZKException) cause; + if (ZKException.isRetryableZKException(zke)) { + LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." + + " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(), + zke.getKeeperExceptionCode()); + bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); + return FutureUtils.value(oldSegmentWriter); + } } + return FutureUtils.exception(cause); } - return Future.exception(cause); - } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - bkDistributedLogManager.getLogSegmentRollingPermitManager() - .releasePermit(switchPermit); - return BoxedUnit.UNIT; - } - }); + ), + // ensure function + () -> bkDistributedLogManager.getLogSegmentRollingPermitManager() + .releasePermit(switchPermit) + ); } else { bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit); - return Future.value(oldSegmentWriter); + return FutureUtils.value(oldSegmentWriter); } } - private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne( + private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne( final BKLogSegmentWriter oldSegmentWriter, final BKLogWriteHandler writeHandler, final long startTxId, @@ -444,14 +433,14 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab writeHandler.getFullyQualifiedName()); } return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID) - .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { + .thenCompose(new Function<BKLogSegmentWriter, CompletableFuture<BKLogSegmentWriter>>() { @Override - public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) { + public CompletableFuture<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) { if (null == newSegmentWriter) { if (bestEffort) { - return Future.value(oldSegmentWriter); + return FutureUtils.value(oldSegmentWriter); } else { - return Future.exception( + return FutureUtils.exception( new UnexpectedException("StartLogSegment returns null for bestEffort rolling")); } } @@ -468,30 +457,30 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab } } - private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter( + private CompletableFuture<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter( BKLogSegmentWriter oldSegmentWriter, final BKLogSegmentWriter newSegmentWriter) { - final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>(); + final CompletableFuture<BKLogSegmentWriter> completePromise = new CompletableFuture<BKLogSegmentWriter>(); // complete the old log segment writeHandler.completeAndCloseLogSegment(oldSegmentWriter) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + .whenComplete(new FutureEventListener<LogSegmentMetadata>() { @Override public void onSuccess(LogSegmentMetadata value) { cacheLogWriter(newSegmentWriter); removeAllocatedLogWriter(); - FutureUtils.setValue(completePromise, newSegmentWriter); + FutureUtils.complete(completePromise, newSegmentWriter); } @Override public void onFailure(Throwable cause) { - FutureUtils.setException(completePromise, cause); + FutureUtils.completeExceptionally(completePromise, cause); } }); return completePromise; } - synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary( + synchronized protected CompletableFuture<BKLogSegmentWriter> rollLogSegmentIfNecessary( final BKLogSegmentWriter segmentWriter, long startTxId, boolean bestEffort, @@ -500,18 +489,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab try { writeHandler = getWriteHandler(); } catch (IOException e) { - return Future.exception(e); + return FutureUtils.exception(e); } - Future<BKLogSegmentWriter> rollPromise; + CompletableFuture<BKLogSegmentWriter> rollPromise; if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) { rollPromise = closeOldLogSegmentAndStartNewOneWithPermit( segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID); } else if (null == segmentWriter) { rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID); } else { - rollPromise = Future.value(segmentWriter); + rollPromise = FutureUtils.value(segmentWriter); } - return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() { + return rollPromise.thenApply(new Function<BKLogSegmentWriter, BKLogSegmentWriter>() { @Override public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) { if (segmentWriter == newSegmentWriter) { @@ -542,7 +531,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab protected synchronized void cancelTruncation() { if (null != lastTruncationAttempt) { - FutureUtils.cancel(lastTruncationAttempt); + lastTruncationAttempt.cancel(true); lastTruncationAttempt = null; } }