http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java index d874274..2a8c83b 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java @@ -27,6 +27,8 @@ import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.ZooKeeperClientUtils; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.exceptions.ZKException; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadata; +import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter; import com.twitter.distributedlog.util.DLUtils; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; @@ -57,6 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; /** * Test ZK based log segment metadata store. @@ -133,14 +136,14 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testCreateLogSegment() throws Exception { LogSegmentMetadata segment = createLogSegment(1L); Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); LogSegmentMetadata segment2 = createLogSegment(1L); Transaction<Object> createTxn2 = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn2, segment2); + lsmStore.createLogSegment(createTxn2, segment2, null); try { FutureUtils.result(createTxn2.execute()); fail("Should fail if log segment exists"); @@ -158,13 +161,13 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testDeleteLogSegment() throws Exception { LogSegmentMetadata segment = createLogSegment(1L); Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", zkc.get().exists(segment.getZkPath(), false)); Transaction<Object> deleteTxn = lsmStore.transaction(); - lsmStore.deleteLogSegment(deleteTxn, segment); + lsmStore.deleteLogSegment(deleteTxn, segment, null); FutureUtils.result(deleteTxn.execute()); assertNull("LogSegment " + segment + " should be deleted", zkc.get().exists(segment.getZkPath(), false)); @@ -174,7 +177,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testDeleteNonExistentLogSegment() throws Exception { LogSegmentMetadata segment = createLogSegment(1L); Transaction<Object> deleteTxn = lsmStore.transaction(); - lsmStore.deleteLogSegment(deleteTxn, segment); + lsmStore.deleteLogSegment(deleteTxn, segment, null); try { FutureUtils.result(deleteTxn.execute()); fail("Should fail deletion if log segment doesn't exist"); @@ -208,7 +211,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testUpdateLogSegment() throws Exception { LogSegmentMetadata segment = createLogSegment(1L, 99L); Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", @@ -230,15 +233,15 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment2 = createLogSegment(2L); // create log segment 1 Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment1); + lsmStore.createLogSegment(createTxn, segment1, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment1 + " should be created", zkc.get().exists(segment1.getZkPath(), false)); // delete log segment 1 and create log segment 2 Transaction<Object> createDeleteTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createDeleteTxn, segment2); - lsmStore.deleteLogSegment(createDeleteTxn, segment1); + lsmStore.createLogSegment(createDeleteTxn, segment2, null); + lsmStore.deleteLogSegment(createDeleteTxn, segment1, null); FutureUtils.result(createDeleteTxn.execute()); // segment 1 should be deleted, segment 2 should be created assertNull("LogSegment " + segment1 + " should be deleted", @@ -254,16 +257,16 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { LogSegmentMetadata segment3 = createLogSegment(3L); // create log segment 1 Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment1); + lsmStore.createLogSegment(createTxn, segment1, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment1 + " should be created", zkc.get().exists(segment1.getZkPath(), false)); // delete log segment 1 and delete log segment 2 Transaction<Object> createDeleteTxn = lsmStore.transaction(); - lsmStore.deleteLogSegment(createDeleteTxn, segment1); - lsmStore.deleteLogSegment(createDeleteTxn, segment2); - lsmStore.createLogSegment(createDeleteTxn, segment3); + lsmStore.deleteLogSegment(createDeleteTxn, segment1, null); + lsmStore.deleteLogSegment(createDeleteTxn, segment2, null); + lsmStore.createLogSegment(createDeleteTxn, segment3, null); try { FutureUtils.result(createDeleteTxn.execute()); fail("Should fail transaction if one operation failed"); @@ -286,7 +289,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { public void testGetLogSegment() throws Exception { LogSegmentMetadata segment = createLogSegment(1L, 99L); Transaction<Object> createTxn = lsmStore.transaction(); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); FutureUtils.result(createTxn.execute()); // the log segment should be created assertNotNull("LogSegment " + segment + " should be created", @@ -304,7 +307,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { for (int i = 0; i < 10; i++) { LogSegmentMetadata segment = createLogSegment(i); createdSegments.add(segment); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); } FutureUtils.result(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); @@ -353,7 +356,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); } FutureUtils.result(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); @@ -394,7 +397,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> anotherCreateTxn = lsmStore.transaction(); for (int i = numSegments; i < 2 * numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(anotherCreateTxn, segment); + lsmStore.createLogSegment(anotherCreateTxn, segment, null); } FutureUtils.result(anotherCreateTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); @@ -419,7 +422,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); } FutureUtils.result(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); @@ -459,7 +462,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> deleteTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.deleteLogSegment(deleteTxn, segment); + lsmStore.deleteLogSegment(deleteTxn, segment, null); } FutureUtils.result(deleteTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); @@ -491,7 +494,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); } FutureUtils.result(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); @@ -536,7 +539,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> anotherCreateTxn = lsmStore.transaction(); for (int i = numSegments; i < 2 * numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(anotherCreateTxn, segment); + lsmStore.createLogSegment(anotherCreateTxn, segment, null); } FutureUtils.result(anotherCreateTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); @@ -561,7 +564,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> createTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.createLogSegment(createTxn, segment); + lsmStore.createLogSegment(createTxn, segment, null); } FutureUtils.result(createTxn.execute()); String rootPath = "/" + runtime.getMethodName(); @@ -602,7 +605,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> deleteTxn = lsmStore.transaction(); for (int i = 0; i < numSegments; i++) { LogSegmentMetadata segment = createLogSegment(i); - lsmStore.deleteLogSegment(deleteTxn, segment); + lsmStore.deleteLogSegment(deleteTxn, segment, null); } FutureUtils.result(deleteTxn.execute()); List<String> newChildren = zkc.get().getChildren(rootPath, false); @@ -634,7 +637,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); final Promise<Version> result = new Promise<Version>(); - lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value, + ZKLogMetadata metadata = mock(ZKLogMetadata.class); + when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); + lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { @@ -659,7 +664,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); - lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value, + ZKLogMetadata metadata = mock(ZKLogMetadata.class); + when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath); + lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { @@ -695,7 +702,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; - lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value, + ZKLogMetadata metadata = mock(ZKLogMetadata.class); + when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath); + lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { @@ -726,7 +735,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0)); final Promise<Version> result = new Promise<Version>(); - lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value, + ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); + lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { @@ -751,7 +762,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Transaction<Object> updateTxn = lsmStore.transaction(); Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); - lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value, + ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath); + lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) { @@ -787,7 +800,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase { Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10)); final Promise<Version> result = new Promise<Version>(); String nonExistentPath = rootZkPath + "/non-existent"; - lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value, + ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class); + when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath); + lsmStore.storeMaxTxnId(updateTxn, metadata, value, new Transaction.OpListener<Version>() { @Override public void onCommit(Version r) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java deleted file mode 100644 index 648b828..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.twitter.distributedlog.TestZooKeeperClientBuilder; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.metadata.DLMetadata; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DLMTestUtil; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClusterTestCase; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Transaction; -import org.apache.zookeeper.ZooDefs; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.URI; -import java.util.List; - -import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; -import static org.junit.Assert.*; - -/** - * Test {@link ZKLogMetadataForWriter} - */ -public class TestZKLogMetadataForWriter extends ZooKeeperClusterTestCase { - - private static final Logger logger = LoggerFactory.getLogger(TestZKLogMetadataForWriter.class); - - private final static int sessionTimeoutMs = 30000; - - @Rule - public TestName testName = new TestName(); - - private ZooKeeperClient zkc; - private URI uri; - - private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier) - throws Exception { - final String logRootPath = getLogRootPath(uri, logName, logIdentifier); - final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; - final String maxTxIdPath = logRootPath + MAX_TXID_PATH; - final String lockPath = logRootPath + LOCK_PATH; - final String readLockPath = logRootPath + READ_LOCK_PATH; - final String versionPath = logRootPath + VERSION_PATH; - final String allocationPath = logRootPath + ALLOCATION_PATH; - - Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0], - zk.getDefaultACL(), CreateMode.PERSISTENT); - Transaction txn = zk.get().transaction(); - txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber( - DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO), - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L), - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES, - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES, - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION), - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES, - zk.getDefaultACL(), CreateMode.PERSISTENT); - txn.commit(); - } - - @Before - public void setup() throws Exception { - zkc = TestZooKeeperClientBuilder.newBuilder() - .name("zkc") - .uri(DLMTestUtil.createDLMURI(zkPort, "/")) - .sessionTimeoutMs(sessionTimeoutMs) - .build(); - uri = DLMTestUtil.createDLMURI(zkPort, ""); - try { - ZkUtils.createFullPathOptimistic( - zkc.get(), - uri.getPath(), - new byte[0], - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - logger.debug("The namespace uri already exists."); - } - } - - @After - public void teardown() throws Exception { - zkc.close(); - } - - @Test(timeout = 60000) - public void testCheckLogMetadataPathsWithAllocator() throws Exception { - String logRootPath = "/" + testName.getMethodName(); - List<Versioned<byte[]>> metadatas = - FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths( - zkc.get(), logRootPath, true)); - assertEquals("Should have 8 paths", - 8, metadatas.size()); - for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) { - assertNull(path.getValue()); - assertNull(path.getVersion()); - } - } - - @Test(timeout = 60000) - public void testCheckLogMetadataPathsWithoutAllocator() throws Exception { - String logRootPath = "/" + testName.getMethodName(); - List<Versioned<byte[]>> metadatas = - FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths( - zkc.get(), logRootPath, false)); - assertEquals("Should have 7 paths", - 7, metadatas.size()); - for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) { - assertNull(path.getValue()); - assertNull(path.getVersion()); - } - } - - private void testCreateLogMetadataWithMissingPaths(URI uri, - String logName, - String logIdentifier, - List<String> pathsToDelete, - boolean ownAllocator, - boolean createLogFirst) - throws Exception { - if (createLogFirst) { - createLog(zkc, uri, logName, logIdentifier); - } - // delete a path - for (String path : pathsToDelete) { - zkc.get().delete(path, -1); - } - - ZKLogMetadataForWriter logMetadata = - FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier, - zkc.get(), zkc.getDefaultACL(), ownAllocator, true)); - - final String logRootPath = getLogRootPath(uri, logName, logIdentifier); - - List<Versioned<byte[]>> metadatas = - FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator)); - - if (ownAllocator) { - assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator, - 8, metadatas.size()); - } else { - assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator, - 7, metadatas.size()); - } - - for (Versioned<byte[]> metadata : metadatas) { - assertTrue(ZKLogMetadataForWriter.pathExists(metadata)); - assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0); - } - - Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData(); - - assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, - DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue())); - - Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData(); - - assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue())); - - if (ownAllocator) { - Versioned<byte[]> allocationData = logMetadata.getAllocationData(); - assertEquals(0, allocationData.getValue().length); - } - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + LOGSEGMENTS_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + MAX_TXID_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingLockPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + LOCK_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingReadLockPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + READ_LOCK_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingVersionPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + VERSION_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingAllocatorPath() throws Exception { - URI uri = DLMTestUtil.createDLMURI(zkPort, ""); - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + ALLOCATION_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataMissingAllPath() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - String logRootPath = getLogRootPath(uri, logName, logIdentifier); - List<String> pathsToDelete = Lists.newArrayList( - logRootPath + LOGSEGMENTS_PATH, - logRootPath + MAX_TXID_PATH, - logRootPath + LOCK_PATH, - logRootPath + READ_LOCK_PATH, - logRootPath + VERSION_PATH, - logRootPath + ALLOCATION_PATH); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataOnExistedLog() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - List<String> pathsToDelete = Lists.newArrayList(); - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); - } - - @Test(timeout = 60000) - public void testCreateLogMetadata() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - List<String> pathsToDelete = Lists.newArrayList(); - - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false); - } - - @Test(timeout = 60000, expected = LogNotFoundException.class) - public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier, - zkc.get(), zkc.getDefaultACL(), true, false)); - } - - @Test(timeout = 60000) - public void testCreateLogMetadataWithCustomMetadata() throws Exception { - String logName = testName.getMethodName(); - String logIdentifier = "<default>"; - List<String> pathsToDelete = Lists.newArrayList(); - - DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri); - - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(new DistributedLogConfiguration()) - .uri(uri) - .build(); - - DistributedLogManager dlm = namespace.openLog(logName); - dlm.createOrUpdateMetadata(logName.getBytes("UTF-8")); - dlm.close(); - - testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java deleted file mode 100644 index ce67c30..0000000 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DLMTestUtil; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.util.DLUtils; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.versioning.Versioned; -import org.junit.Test; - -import java.net.URI; -import java.util.List; - -import static org.junit.Assert.*; - -public class TestZKLogMetadataForWriterUtilFunctions { - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingMaxTxnId() throws Exception { - String rootPath = "/test-missing-max-txn-id"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingVersion() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasWrongVersion() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(9999), null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingLockPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingReadLockPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000, expected = UnexpectedException.class) - public void testProcessLogMetadatasMissingAllocatorPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)), - new Versioned<byte[]>(null, null)); - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000) - public void testProcessLogMetadatasNoAllocatorPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - Versioned<byte[]> maxTxnIdData = - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)); - Versioned<byte[]> logSegmentsData = - new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)); - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - maxTxnIdData, - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - logSegmentsData); - ZKLogMetadataForWriter metadata = - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false); - assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); - assertTrue(logSegmentsData == metadata.getMaxLSSNData()); - assertNull(metadata.getAllocationData().getValue()); - assertNull(metadata.getAllocationData().getVersion()); - } - - @SuppressWarnings("unchecked") - @Test(timeout = 60000) - public void testProcessLogMetadatasAllocatorPath() throws Exception { - String rootPath = "/test-missing-version"; - URI uri = DLMTestUtil.createDLMURI(2181, rootPath); - String logName = "test-log"; - String logIdentifier = "<default>"; - Versioned<byte[]> maxTxnIdData = - new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)); - Versioned<byte[]> logSegmentsData = - new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)); - Versioned<byte[]> allocationData = - new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1)); - List<Versioned<byte[]>> metadatas = Lists.newArrayList( - new Versioned<byte[]>(null, null), - new Versioned<byte[]>(null, null), - maxTxnIdData, - new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - new Versioned<byte[]>(new byte[0], new ZkVersion(1)), - logSegmentsData, - allocationData); - ZKLogMetadataForWriter metadata = - ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true); - assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); - assertTrue(logSegmentsData == metadata.getMaxLSSNData()); - assertTrue(allocationData == metadata.getAllocationData()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java new file mode 100644 index 0000000..9a08aa0 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.twitter.distributedlog.TestZooKeeperClientBuilder; +import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.metadata.DLMetadata; +import com.google.common.collect.Lists; +import com.twitter.distributedlog.DLMTestUtil; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.DistributedLogManager; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.ZooKeeperClusterTestCase; +import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.Utils; +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.ZooDefs; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.List; + +import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*; +import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*; +import static org.junit.Assert.*; + +/** + * Test {@link ZKLogStreamMetadataStore} + */ +public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class); + + private final static int sessionTimeoutMs = 30000; + + @Rule + public TestName testName = new TestName(); + + private ZooKeeperClient zkc; + private URI uri; + + private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier) + throws Exception { + final String logRootPath = getLogRootPath(uri, logName, logIdentifier); + final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; + final String maxTxIdPath = logRootPath + MAX_TXID_PATH; + final String lockPath = logRootPath + LOCK_PATH; + final String readLockPath = logRootPath + READ_LOCK_PATH; + final String versionPath = logRootPath + VERSION_PATH; + final String allocationPath = logRootPath + ALLOCATION_PATH; + + Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0], + zk.getDefaultACL(), CreateMode.PERSISTENT); + Transaction txn = zk.get().transaction(); + txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber( + DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO), + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L), + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES, + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES, + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.create(versionPath, intToBytes(LAYOUT_VERSION), + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES, + zk.getDefaultACL(), CreateMode.PERSISTENT); + txn.commit(); + } + + @Before + public void setup() throws Exception { + zkc = TestZooKeeperClientBuilder.newBuilder() + .name("zkc") + .uri(DLMTestUtil.createDLMURI(zkPort, "/")) + .sessionTimeoutMs(sessionTimeoutMs) + .build(); + uri = DLMTestUtil.createDLMURI(zkPort, ""); + try { + ZkUtils.createFullPathOptimistic( + zkc.get(), + uri.getPath(), + new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + logger.debug("The namespace uri already exists."); + } + } + + @After + public void teardown() throws Exception { + zkc.close(); + } + + @Test(timeout = 60000) + public void testCheckLogMetadataPathsWithAllocator() throws Exception { + String logRootPath = "/" + testName.getMethodName(); + List<Versioned<byte[]>> metadatas = + FutureUtils.result(checkLogMetadataPaths( + zkc.get(), logRootPath, true)); + assertEquals("Should have 8 paths", + 8, metadatas.size()); + for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) { + assertNull(path.getValue()); + assertNull(path.getVersion()); + } + } + + @Test(timeout = 60000) + public void testCheckLogMetadataPathsWithoutAllocator() throws Exception { + String logRootPath = "/" + testName.getMethodName(); + List<Versioned<byte[]>> metadatas = + FutureUtils.result(checkLogMetadataPaths( + zkc.get(), logRootPath, false)); + assertEquals("Should have 7 paths", + 7, metadatas.size()); + for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) { + assertNull(path.getValue()); + assertNull(path.getVersion()); + } + } + + private void testCreateLogMetadataWithMissingPaths(URI uri, + String logName, + String logIdentifier, + List<String> pathsToDelete, + boolean ownAllocator, + boolean createLogFirst) + throws Exception { + if (createLogFirst) { + createLog(zkc, uri, logName, logIdentifier); + } + // delete a path + for (String path : pathsToDelete) { + zkc.get().delete(path, -1); + } + + ZKLogMetadataForWriter logMetadata = + FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true)); + + final String logRootPath = getLogRootPath(uri, logName, logIdentifier); + + List<Versioned<byte[]>> metadatas = + FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator)); + + if (ownAllocator) { + assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator, + 8, metadatas.size()); + } else { + assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator, + 7, metadatas.size()); + } + + for (Versioned<byte[]> metadata : metadatas) { + assertTrue(pathExists(metadata)); + assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0); + } + + Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData(); + + assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO, + DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue())); + + Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData(); + + assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue())); + + if (ownAllocator) { + Versioned<byte[]> allocationData = logMetadata.getAllocationData(); + assertEquals(0, allocationData.getValue().length); + } + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + LOGSEGMENTS_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + MAX_TXID_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingLockPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + LOCK_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingReadLockPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + READ_LOCK_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingVersionPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + VERSION_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingAllocatorPath() throws Exception { + URI uri = DLMTestUtil.createDLMURI(zkPort, ""); + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + ALLOCATION_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataMissingAllPath() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + String logRootPath = getLogRootPath(uri, logName, logIdentifier); + List<String> pathsToDelete = Lists.newArrayList( + logRootPath + LOGSEGMENTS_PATH, + logRootPath + MAX_TXID_PATH, + logRootPath + LOCK_PATH, + logRootPath + READ_LOCK_PATH, + logRootPath + VERSION_PATH, + logRootPath + ALLOCATION_PATH); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataOnExistedLog() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + List<String> pathsToDelete = Lists.newArrayList(); + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true); + } + + @Test(timeout = 60000) + public void testCreateLogMetadata() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + List<String> pathsToDelete = Lists.newArrayList(); + + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false); + } + + @Test(timeout = 60000, expected = LogNotFoundException.class) + public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false)); + } + + @Test(timeout = 60000) + public void testCreateLogMetadataWithCustomMetadata() throws Exception { + String logName = testName.getMethodName(); + String logIdentifier = "<default>"; + List<String> pathsToDelete = Lists.newArrayList(); + + DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri); + + DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(new DistributedLogConfiguration()) + .uri(uri) + .build(); + + DistributedLogManager dlm = namespace.openLog(logName); + dlm.createOrUpdateMetadata(logName.getBytes("UTF-8")); + dlm.close(); + + testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java new file mode 100644 index 0000000..f14a217 --- /dev/null +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.google.common.collect.Lists; +import com.twitter.distributedlog.DLMTestUtil; +import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.util.DLUtils; +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.bookkeeper.versioning.Versioned; +import org.junit.Test; + +import java.net.URI; +import java.util.List; + +import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*; +import static org.junit.Assert.*; + +public class TestZKLogStreamMetadataStoreUtils { + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingMaxTxnId() throws Exception { + String rootPath = "/test-missing-max-txn-id"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingVersion() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasWrongVersion() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(intToBytes(9999), null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingLockPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingReadLockPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000, expected = UnexpectedException.class) + public void testProcessLogMetadatasMissingAllocatorPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)), + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)), + new Versioned<byte[]>(null, null)); + processLogMetadatas(uri, logName, logIdentifier, metadatas, true); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000) + public void testProcessLogMetadatasNoAllocatorPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + Versioned<byte[]> maxTxnIdData = + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)); + Versioned<byte[]> logSegmentsData = + new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)); + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + maxTxnIdData, + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + logSegmentsData); + ZKLogMetadataForWriter metadata = + processLogMetadatas(uri, logName, logIdentifier, metadatas, false); + assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); + assertTrue(logSegmentsData == metadata.getMaxLSSNData()); + assertNull(metadata.getAllocationData().getValue()); + assertNull(metadata.getAllocationData().getVersion()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 60000) + public void testProcessLogMetadatasAllocatorPath() throws Exception { + String rootPath = "/test-missing-version"; + URI uri = DLMTestUtil.createDLMURI(2181, rootPath); + String logName = "test-log"; + String logIdentifier = "<default>"; + Versioned<byte[]> maxTxnIdData = + new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)); + Versioned<byte[]> logSegmentsData = + new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)); + Versioned<byte[]> allocationData = + new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1)); + List<Versioned<byte[]>> metadatas = Lists.newArrayList( + new Versioned<byte[]>(null, null), + new Versioned<byte[]>(null, null), + maxTxnIdData, + new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + new Versioned<byte[]>(new byte[0], new ZkVersion(1)), + logSegmentsData, + allocationData); + ZKLogMetadataForWriter metadata = + processLogMetadatas(uri, logName, logIdentifier, metadatas, true); + assertTrue(maxTxnIdData == metadata.getMaxTxIdData()); + assertTrue(logSegmentsData == metadata.getMaxLSSNData()); + assertTrue(allocationData == metadata.getAllocationData()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java index 8899c0e..db87a65 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog.util; +import com.twitter.distributedlog.zk.LimitedPermitManager; import org.junit.Test; import java.util.ArrayList;