http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java new file mode 100644 index 0000000..d2d61a9 --- /dev/null +++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.routing; + +import static org.junit.Assert.assertEquals; + +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import com.twitter.finagle.Address; +import com.twitter.finagle.Addresses; +import com.twitter.finagle.addr.WeightedAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Case for {@link RoutingService}. + */ +@RunWith(Parameterized.class) +public class TestRoutingService { + + static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class); + + @Parameterized.Parameters + public static Collection<Object[]> configs() { + ArrayList<Object[]> list = new ArrayList<Object[]>(); + for (int i = 0; i <= 1; i++) { + for (int j = 0; j <= 1; j++) { + for (int k = 0; k <= 1; k++) { + list.add(new Boolean[] {i == 1, j == 1, k == 1}); + } + } + } + return list; + } + + private final boolean consistentHash; + private final boolean weightedAddresses; + private final boolean asyncResolution; + + public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) { + this.consistentHash = consistentHash; + this.weightedAddresses = weightedAddresses; + this.asyncResolution = asyncResolution; + } + + private List<Address> getAddresses(boolean weightedAddresses) { + ArrayList<Address> addresses = new ArrayList<Address>(); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181))); + addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181))); + + if (weightedAddresses) { + ArrayList<Address> wAddresses = new ArrayList<Address>(); + for (Address address: addresses) { + wAddresses.add(WeightedAddress.apply(address, 1.0)); + } + return wAddresses; + } else { + return addresses; + } + } + + private void testRoutingServiceHelper(boolean consistentHash, + boolean weightedAddresses, + boolean asyncResolution) + throws Exception { + ExecutorService executorService = null; + final List<Address> addresses = getAddresses(weightedAddresses); + final TestName name = new TestName(); + RoutingService routingService; + if (consistentHash) { + routingService = ConsistentHashRoutingService.newBuilder() + .serverSet(new NameServerSet(name)) + .resolveFromName(true) + .numReplicas(997) + .build(); + } else { + routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder() + .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build(); + } + + if (asyncResolution) { + executorService = Executors.newSingleThreadExecutor(); + executorService.submit(new Runnable() { + @Override + public void run() { + name.changeAddrs(addresses); + } + }); + } else { + name.changeAddrs(addresses); + } + routingService.startService(); + + HashSet<SocketAddress> mapping = new HashSet<SocketAddress>(); + + for (int i = 0; i < 1000; i++) { + for (int j = 0; j < 5; j++) { + String stream = "TestStream-" + i + "-" + j; + mapping.add(routingService.getHost(stream, + RoutingService.RoutingContext.of(new DefaultRegionResolver()))); + } + } + + assertEquals(mapping.size(), addresses.size()); + + if (null != executorService) { + executorService.shutdown(); + } + + } + + @Test(timeout = 5000) + public void testRoutingService() throws Exception { + testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java new file mode 100644 index 0000000..ab0cb58 --- /dev/null +++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.speculative; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import com.twitter.util.CountDownLatch; +import com.twitter.util.Future; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}. + */ +public class TestDefaultSpeculativeRequestExecutionPolicy { + + @Test(timeout = 20000, expected = IllegalArgumentException.class) + public void testInvalidBackoffMultiplier() throws Exception { + new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1); + } + + @Test(timeout = 20000, expected = IllegalArgumentException.class) + public void testInvalidMaxSpeculativeTimeout() throws Exception { + new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2); + } + + @Test(timeout = 20000) + public void testSpeculativeRequests() throws Exception { + DefaultSpeculativeRequestExecutionPolicy policy = + new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2); + SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); + + final AtomicInteger callCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(3); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return Future.value(callCount.incrementAndGet() < 3); + } finally { + latch.countDown(); + } + } + }).when(executor).issueSpeculativeRequest(); + + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + policy.initiateSpeculativeRequest(executorService, executor); + + latch.await(); + + assertEquals(40, policy.getNextSpeculativeRequestTimeout()); + } + + @Test(timeout = 20000) + public void testSpeculativeRequestsWithMaxTimeout() throws Exception { + DefaultSpeculativeRequestExecutionPolicy policy = + new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2); + SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class); + + final AtomicInteger callCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(3); + + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + try { + return Future.value(callCount.incrementAndGet() < 3); + } finally { + latch.countDown(); + } + } + }).when(executor).issueSpeculativeRequest(); + + ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + policy.initiateSpeculativeRequest(executorService, executor); + + latch.await(); + + assertEquals(15, policy.getNextSpeculativeRequestTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java new file mode 100644 index 0000000..d2df9a5 --- /dev/null +++ b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static org.junit.Assert.assertFalse; + +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration; +import org.junit.Test; + +/** + * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}. + */ +public class TestDistributedLogClientBuilder { + + @Test(timeout = 60000) + public void testBuildClientsFromSameBuilder() throws Exception { + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() + .name("build-clients-from-same-builder") + .clientId(ClientId$.MODULE$.apply("test-builder")) + .finagleNameStr("inet!127.0.0.1:7001") + .streamNameRegex(".*") + .handshakeWithClientInfo(true) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectTimeout(Duration.fromSeconds(1)) + .tcpConnectTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(10))); + DistributedLogClient client1 = builder.build(); + DistributedLogClient client2 = builder.build(); + assertFalse(client1 == client2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties index cafc888..38ab34d 100644 --- a/distributedlog-core/conf/log4j.properties +++ b/distributedlog-core/conf/log4j.properties @@ -32,11 +32,11 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.com.twitter.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors +log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors +log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.com.twitter.distributedlog.util.MonitoredFuturePool=false -log4j.additivity.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false +log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false +log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false log4j.appender.Executors=org.apache.log4j.RollingFileAppender http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/zookeeper.conf.dynamic.template ---------------------------------------------------------------------- diff --git a/distributedlog-core/conf/zookeeper.conf.dynamic.template b/distributedlog-core/conf/zookeeper.conf.dynamic.template index 4bda9f1..f4e35f5 100644 --- a/distributedlog-core/conf/zookeeper.conf.dynamic.template +++ b/distributedlog-core/conf/zookeeper.conf.dynamic.template @@ -1 +1 @@ -#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements. See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership. The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License. You may obtain a copy of the License at# *# * http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181 \ No newline at end of file +#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements. See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership. The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License. You may obtain a copy of the License at# *# * http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml index c5329aa..c4bfa8f 100644 --- a/distributedlog-core/pom.xml +++ b/distributedlog-core/pom.xml @@ -206,7 +206,7 @@ <properties> <property> <name>listener</name> - <value>com.twitter.distributedlog.TimedOutTestsListener</value> + <value>org.apache.distributedlog.TimedOutTestsListener</value> </property> </properties> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java deleted file mode 100644 index 0f93bfe..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.io.InputStream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AppendOnlyStreamReader extends InputStream { - static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class); - - private LogRecordWithInputStream currentLogRecord = null; - private final DistributedLogManager dlm; - private LogReader reader; - private long currentPosition; - private static final int SKIP_BUFFER_SIZE = 512; - - // Cache the input stream for a log record. - private static class LogRecordWithInputStream { - private final InputStream payloadStream; - private final LogRecordWithDLSN logRecord; - - LogRecordWithInputStream(LogRecordWithDLSN logRecord) { - Preconditions.checkNotNull(logRecord); - - LOG.debug("Got record dlsn = {}, txid = {}, len = {}", - new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length}); - - this.logRecord = logRecord; - this.payloadStream = logRecord.getPayLoadInputStream(); - } - - InputStream getPayLoadInputStream() { - return payloadStream; - } - - LogRecordWithDLSN getLogRecord() { - return logRecord; - } - - // The last txid of the log record is the position of the next byte in the stream. - // Subtract length to get starting offset. - long getOffset() { - return logRecord.getTransactionId() - logRecord.getPayload().length; - } - } - - /** - * Construct ledger input stream - * - * @param dlm the Distributed Log Manager to access the stream - */ - AppendOnlyStreamReader(DistributedLogManager dlm) - throws IOException { - this.dlm = dlm; - reader = dlm.getInputStream(0); - currentPosition = 0; - } - - /** - * Get input stream representing next entry in the - * ledger. - * - * @return input stream, or null if no more entries - */ - private LogRecordWithInputStream nextLogRecord() throws IOException { - return nextLogRecord(reader); - } - - private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException { - LogRecordWithDLSN record = reader.readNext(false); - - if (null != record) { - return new LogRecordWithInputStream(record); - } else { - record = reader.readNext(false); - if (null != record) { - return new LogRecordWithInputStream(record); - } else { - LOG.debug("No record"); - return null; - } - } - } - - @Override - public int read() throws IOException { - byte[] b = new byte[1]; - if (read(b, 0, 1) != 1) { - return -1; - } else { - return b[0]; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int read = 0; - if (currentLogRecord == null) { - currentLogRecord = nextLogRecord(); - if (currentLogRecord == null) { - return read; - } - } - - while (read < len) { - int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read)); - if (thisread == -1) { - currentLogRecord = nextLogRecord(); - if (currentLogRecord == null) { - return read; - } - } else { - LOG.debug("Offset saved = {}, persisted = {}", - currentPosition, currentLogRecord.getLogRecord().getTransactionId()); - currentPosition += thisread; - read += thisread; - } - } - return read; - } - - /** - * Position the reader at the given offset. If we fail to skip to the desired position - * and don't hit end of stream, return false. - * - * @throws com.twitter.distributedlog.exceptions.EndOfStreamException if we attempt to - * skip past the end of the stream. - */ - public boolean skipTo(long position) throws IOException { - - // No need to skip anywhere. - if (position == position()) { - return true; - } - - LogReader skipReader = dlm.getInputStream(position); - LogRecordWithInputStream logRecord = null; - try { - logRecord = nextLogRecord(skipReader); - } catch (IOException ex) { - skipReader.close(); - throw ex; - } - - if (null == logRecord) { - return false; - } - - // We may end up with a reader positioned *before* the requested position if - // we're near the tail and the writer is still active, or if the desired position - // is not at a log record payload boundary. - // Transaction ID gives us the starting position of the log record. Read ahead - // if necessary. - currentPosition = logRecord.getOffset(); - currentLogRecord = logRecord; - LogReader oldReader = reader; - reader = skipReader; - - // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail - // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state. - oldReader.close(); - - byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE]; - while (currentPosition < position) { - long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE); - long bytesRead = read(skipBuffer, 0, (int)bytesToRead); - if (bytesRead < bytesToRead) { - return false; - } - } - - return true; - } - - public long position() { - return currentPosition; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java deleted file mode 100644 index aa0aef9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import java.io.Closeable; -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AppendOnlyStreamWriter implements Closeable { - static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class); - - // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive - // type because synchronized block is needed anyway. - final long[] syncPos = new long[1]; - BKAsyncLogWriter logWriter; - long requestPos = 0; - - public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) { - LOG.debug("initialize at position {}", pos); - this.logWriter = logWriter; - this.syncPos[0] = pos; - this.requestPos = pos; - } - - public Future<DLSN> write(byte[] data) { - requestPos += data.length; - Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data)); - return writeResult.addEventListener(new WriteCompleteListener(requestPos)); - } - - public void force(boolean metadata) throws IOException { - long pos = 0; - try { - pos = Await.result(logWriter.flushAndCommit()); - } catch (IOException ioe) { - throw ioe; - } catch (Exception ex) { - LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex); - throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex); - } - synchronized (syncPos) { - syncPos[0] = pos; - } - } - - public long position() { - synchronized (syncPos) { - return syncPos[0]; - } - } - - @Override - public void close() throws IOException { - logWriter.closeAndComplete(); - } - - public void markEndOfStream() throws IOException { - try { - Await.result(logWriter.markEndOfStream()); - } catch (IOException ioe) { - throw ioe; - } catch (Exception ex) { - throw new UnexpectedException("Mark end of stream hit unexpected exception", ex); - } - } - - class WriteCompleteListener implements FutureEventListener<DLSN> { - private final long position; - public WriteCompleteListener(long position) { - this.position = position; - } - @Override - public void onSuccess(DLSN response) { - synchronized (syncPos) { - if (position > syncPos[0]) { - syncPos[0] = position; - } - } - } - @Override - public void onFailure(Throwable cause) { - // Handled at the layer above - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java deleted file mode 100644 index 8e07797..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public interface AsyncLogReader extends AsyncCloseable { - - /** - * Get stream name that the reader reads from. - * - * @return stream name. - */ - public String getStreamName(); - - /** - * Read the next record from the log stream - * - * @return A promise that when satisfied will contain the Log Record with its DLSN. - */ - public Future<LogRecordWithDLSN> readNext(); - - /** - * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list - * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort - * call. - * - * @param numEntries - * num entries - * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. - */ - public Future<List<LogRecordWithDLSN>> readBulk(int numEntries); - - /** - * Read next <i>numEntries</i> entries in a given <i>waitTime</i>. - * <p> - * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>. - * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would - * wait until new entries are available. - * - * @param numEntries - * max entries to return - * @param waitTime - * maximum wait time if there are entries already for read - * @param timeUnit - * wait time unit - * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. - */ - public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java deleted file mode 100644 index e83e343..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.io.AsyncAbortable; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -import java.io.Closeable; -import java.util.List; - -public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { - - /** - * Get the last committed transaction id. - * - * @return last committed transaction id. - */ - public long getLastTxId(); - - /** - * Write a log record to the stream. - * - * @param record single log record - * @return A Future which contains a DLSN if the record was successfully written - * or an exception if the write fails - */ - public Future<DLSN> write(LogRecord record); - - /** - * Write log records to the stream in bulk. Each future in the list represents the result of - * one write operation. The size of the result list is equal to the size of the input list. - * Buffers are written in order, and the list of result futures has the same order. - * - * @param record set of log records - * @return A Future which contains a list of Future DLSNs if the record was successfully written - * or an exception if the operation fails. - */ - public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record); - - /** - * Truncate the log until <i>dlsn</i>. - * - * @param dlsn - * dlsn to truncate until. - * @return A Future indicates whether the operation succeeds or not, or an exception - * if the truncation fails. - */ - public Future<Boolean> truncate(DLSN dlsn); - - /** - * Get the name of the stream this writer writes data to - */ - public String getStreamName(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java deleted file mode 100644 index bd71147..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -public interface AsyncNotification { - /** - * Triggered when the background activity encounters an exception - * - * @param reason the exception that encountered. - */ - void notifyOnError(Throwable reason); - - /** - * Triggered when the background activity completes an operation - */ - void notifyOnOperationComplete(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java deleted file mode 100644 index d1c28d7..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java +++ /dev/null @@ -1,555 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.io.Abortable; -import com.twitter.distributedlog.io.Abortables; -import com.twitter.distributedlog.io.AsyncAbortable; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.PermitManager; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable { - static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class); - - protected final DistributedLogConfiguration conf; - private final DynamicDistributedLogConfiguration dynConf; - protected final BKDistributedLogManager bkDistributedLogManager; - - // States - private Promise<Void> closePromise = null; - private volatile boolean forceRolling = false; - private boolean forceRecovery = false; - - // Truncation Related - private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null; - @VisibleForTesting - private Long minTimestampToKeepOverride = null; - - // Log Segment Writers - protected BKLogSegmentWriter segmentWriter = null; - protected Future<BKLogSegmentWriter> segmentWriterFuture = null; - protected BKLogSegmentWriter allocatedSegmentWriter = null; - protected BKLogWriteHandler writeHandler = null; - - BKAbstractLogWriter(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - BKDistributedLogManager bkdlm) { - this.conf = conf; - this.dynConf = dynConf; - this.bkDistributedLogManager = bkdlm; - LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(), - TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS)); - } - - // manage write handler - - synchronized protected BKLogWriteHandler getCachedWriteHandler() { - return writeHandler; - } - - protected BKLogWriteHandler getWriteHandler() throws IOException { - BKLogWriteHandler writeHandler = createAndCacheWriteHandler(); - writeHandler.checkMetadataException(); - return writeHandler; - } - - protected BKLogWriteHandler createAndCacheWriteHandler() - throws IOException { - synchronized (this) { - if (writeHandler != null) { - return writeHandler; - } - } - // This code path will be executed when the handler is not set or has been closed - // due to forceRecovery during testing - BKLogWriteHandler newHandler = - FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false)); - boolean success = false; - try { - synchronized (this) { - if (writeHandler == null) { - writeHandler = newHandler; - success = true; - } - return writeHandler; - } - } finally { - if (!success) { - newHandler.asyncAbort(); - } - } - } - - // manage log segment writers - - protected synchronized BKLogSegmentWriter getCachedLogWriter() { - return segmentWriter; - } - - protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() { - return segmentWriterFuture; - } - - protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) { - this.segmentWriter = logWriter; - this.segmentWriterFuture = Future.value(logWriter); - } - - protected synchronized BKLogSegmentWriter removeCachedLogWriter() { - try { - return segmentWriter; - } finally { - segmentWriter = null; - segmentWriterFuture = null; - } - } - - protected synchronized BKLogSegmentWriter getAllocatedLogWriter() { - return allocatedSegmentWriter; - } - - protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) { - this.allocatedSegmentWriter = logWriter; - } - - protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() { - try { - return allocatedSegmentWriter; - } finally { - allocatedSegmentWriter = null; - } - } - - private Future<Void> asyncCloseAndComplete(boolean shouldThrow) { - BKLogSegmentWriter segmentWriter = getCachedLogWriter(); - BKLogWriteHandler writeHandler = getCachedWriteHandler(); - if (null != segmentWriter && null != writeHandler) { - cancelTruncation(); - Promise<Void> completePromise = new Promise<Void>(); - asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow); - return completePromise; - } else { - return closeNoThrow(); - } - } - - private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter, - final BKLogWriteHandler writeHandler, - final Promise<Void> completePromise, - final boolean shouldThrow) { - writeHandler.completeAndCloseLogSegment(segmentWriter) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { - @Override - public void onSuccess(LogSegmentMetadata segment) { - removeCachedLogWriter(); - complete(null); - } - - @Override - public void onFailure(Throwable cause) { - LOG.error("Completing Log segments encountered exception", cause); - complete(cause); - } - - private void complete(final Throwable cause) { - closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - if (null != cause && shouldThrow) { - FutureUtils.setException(completePromise, cause); - } else { - FutureUtils.setValue(completePromise, null); - } - return BoxedUnit.UNIT; - } - }); - } - }); - } - - @VisibleForTesting - void closeAndComplete() throws IOException { - FutureUtils.result(asyncCloseAndComplete(true)); - } - - protected Future<Void> asyncCloseAndComplete() { - return asyncCloseAndComplete(true); - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - @Override - public Future<Void> asyncClose() { - return asyncCloseAndComplete(false); - } - - /** - * Close the writer and release all the underlying resources - */ - protected Future<Void> closeNoThrow() { - Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - } - cancelTruncation(); - Utils.closeSequence(bkDistributedLogManager.getScheduler(), - true, /** ignore close errors **/ - getCachedLogWriter(), - getAllocatedLogWriter(), - getCachedWriteHandler() - ).proxyTo(closeFuture); - return closeFuture; - } - - @Override - public void abort() throws IOException { - FutureUtils.result(asyncAbort()); - } - - @Override - public Future<Void> asyncAbort() { - Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - } - cancelTruncation(); - Abortables.abortSequence(bkDistributedLogManager.getScheduler(), - getCachedLogWriter(), - getAllocatedLogWriter(), - getCachedWriteHandler()).proxyTo(closeFuture); - return closeFuture; - } - - // used by sync writer - protected BKLogSegmentWriter getLedgerWriter(final long startTxId, - final boolean allowMaxTxID) - throws IOException { - Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true); - BKLogSegmentWriter logSegmentWriter = null; - if (null != logSegmentWriterFuture) { - logSegmentWriter = FutureUtils.result(logSegmentWriterFuture); - } - if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) { - logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary( - logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID)); - } - return logSegmentWriter; - } - - // used by async writer - synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) { - final BKLogSegmentWriter ledgerWriter = getCachedLogWriter(); - Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture(); - if (null == ledgerWriterFuture || null == ledgerWriter) { - return null; - } - - // Handle the case where the last call to write actually caused an error in the log - if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) { - // Close the ledger writer so that we will recover and start a new log segment - Future<Void> closeFuture; - if (ledgerWriter.isLogSegmentInError()) { - closeFuture = ledgerWriter.asyncAbort(); - } else { - closeFuture = ledgerWriter.asyncClose(); - } - return closeFuture.flatMap( - new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(Void result) { - removeCachedLogWriter(); - - if (ledgerWriter.isLogSegmentInError()) { - return Future.value(null); - } - - BKLogWriteHandler writeHandler; - try { - writeHandler = getWriteHandler(); - } catch (IOException e) { - return Future.exception(e); - } - if (null != writeHandler && forceRecovery) { - return writeHandler.completeAndCloseLogSegment(ledgerWriter) - .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() { - @Override - public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) { - return null; - } - }); - } else { - return Future.value(null); - } - } - }); - } else { - return ledgerWriterFuture; - } - } - - boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException { - BKLogWriteHandler writeHandler = getWriteHandler(); - return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling; - } - - private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) { - boolean truncationEnabled = false; - - long minTimestampToKeep = 0; - - long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS); - if (retentionPeriodInMillis > 0) { - minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis; - truncationEnabled = true; - } - - if (null != minTimestampToKeepOverride) { - minTimestampToKeep = minTimestampToKeepOverride; - truncationEnabled = true; - } - - // skip scheduling if there is task that's already running - // - synchronized (this) { - if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { - lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); - } - } - } - - private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, - final long startTxId, - final boolean allowMaxTxID) { - return writeHandler.recoverIncompleteLogSegments() - .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(Long lastTxId) { - return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID) - .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() { - @Override - public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) { - cacheLogWriter(newSegmentWriter); - return BoxedUnit.UNIT; - } - }); - } - }); - } - - private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit( - final BKLogSegmentWriter oldSegmentWriter, - final BKLogWriteHandler writeHandler, - final long startTxId, - final boolean bestEffort, - final boolean allowMaxTxID) { - final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit(); - if (switchPermit.isAllowed()) { - return closeOldLogSegmentAndStartNewOne( - oldSegmentWriter, - writeHandler, - startTxId, - bestEffort, - allowMaxTxID - ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(Throwable cause) { - if (cause instanceof LockingException) { - LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ", - writeHandler.getFullyQualifiedName(), cause); - bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); - return Future.value(oldSegmentWriter); - } else if (cause instanceof ZKException) { - ZKException zke = (ZKException) cause; - if (ZKException.isRetryableZKException(zke)) { - LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." + - " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(), - zke.getKeeperExceptionCode()); - bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); - return Future.value(oldSegmentWriter); - } - } - return Future.exception(cause); - } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - bkDistributedLogManager.getLogSegmentRollingPermitManager() - .releasePermit(switchPermit); - return BoxedUnit.UNIT; - } - }); - } else { - bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit); - return Future.value(oldSegmentWriter); - } - } - - private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne( - final BKLogSegmentWriter oldSegmentWriter, - final BKLogWriteHandler writeHandler, - final long startTxId, - final boolean bestEffort, - final boolean allowMaxTxID) { - // we switch only when we could allocate a new log segment. - BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter(); - if (null == newSegmentWriter) { - if (LOG.isDebugEnabled()) { - LOG.debug("Allocating a new log segment from {} for {}.", startTxId, - writeHandler.getFullyQualifiedName()); - } - return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID) - .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { - @Override - public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) { - if (null == newSegmentWriter) { - if (bestEffort) { - return Future.value(oldSegmentWriter); - } else { - return Future.exception( - new UnexpectedException("StartLogSegment returns null for bestEffort rolling")); - } - } - cacheAllocatedLogWriter(newSegmentWriter); - if (LOG.isDebugEnabled()) { - LOG.debug("Allocated a new log segment from {} for {}.", startTxId, - writeHandler.getFullyQualifiedName()); - } - return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); - } - }); - } else { - return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); - } - } - - private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter( - BKLogSegmentWriter oldSegmentWriter, - final BKLogSegmentWriter newSegmentWriter) { - final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>(); - // complete the old log segment - writeHandler.completeAndCloseLogSegment(oldSegmentWriter) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { - - @Override - public void onSuccess(LogSegmentMetadata value) { - cacheLogWriter(newSegmentWriter); - removeAllocatedLogWriter(); - FutureUtils.setValue(completePromise, newSegmentWriter); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(completePromise, cause); - } - }); - return completePromise; - } - - synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary( - final BKLogSegmentWriter segmentWriter, - long startTxId, - boolean bestEffort, - boolean allowMaxTxID) { - final BKLogWriteHandler writeHandler; - try { - writeHandler = getWriteHandler(); - } catch (IOException e) { - return Future.exception(e); - } - Future<BKLogSegmentWriter> rollPromise; - if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) { - rollPromise = closeOldLogSegmentAndStartNewOneWithPermit( - segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID); - } else if (null == segmentWriter) { - rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID); - } else { - rollPromise = Future.value(segmentWriter); - } - return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() { - @Override - public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) { - if (segmentWriter == newSegmentWriter) { - return newSegmentWriter; - } - truncateLogSegmentsIfNecessary(writeHandler); - return newSegmentWriter; - } - }); - } - - protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { - if (null != closePromise) { - LOG.error("Executing " + operation + " on already closed Log Writer"); - throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer"); - } - } - - @VisibleForTesting - public void setForceRolling(boolean forceRolling) { - this.forceRolling = forceRolling; - } - - @VisibleForTesting - public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) { - this.minTimestampToKeepOverride = minTimestampToKeepOverride; - } - - protected synchronized void cancelTruncation() { - if (null != lastTruncationAttempt) { - FutureUtils.cancel(lastTruncationAttempt); - lastTruncationAttempt = null; - } - } - - @VisibleForTesting - public synchronized void setForceRecovery(boolean forceRecovery) { - this.forceRecovery = forceRecovery; - } - -}