http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java index a7dead4..d1069c3 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java @@ -28,6 +28,7 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder; import com.twitter.distributedlog.ZooKeeperClient; import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; @@ -104,6 +105,10 @@ public class TestDLCK extends TestDistributedLogBase { zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); com.twitter.distributedlog.DistributedLogManagerFactory factory = new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri); + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("dlck-tool") + .corePoolSize(1) + .build(); ExecutorService executorService = Executors.newCachedThreadPool(); String streamName = "check-and-repair-dl-namespace"; @@ -119,7 +124,7 @@ public class TestDLCK extends TestDistributedLogBase { BookKeeperClient bkc = getBookKeeperClient(factory); DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory, new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)), - executorService, bkc, confLocal.getBKDigestPW(), false, false); + scheduler, bkc, confLocal.getBKDigestPW(), false, false); Map<Long, LogSegmentMetadata> segments = getLogSegments(dlm); LOG.info("segments after drynrun {}", segments); @@ -132,7 +137,7 @@ public class TestDLCK extends TestDistributedLogBase { bkc = getBookKeeperClient(factory); DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory, LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)), - executorService, bkc, confLocal.getBKDigestPW(), false, false); + scheduler, bkc, confLocal.getBKDigestPW(), false, false); segments = getLogSegments(dlm); LOG.info("segments after repair {}", segments);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java index 88840a0..4cf86fa 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java @@ -91,7 +91,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase { DistributedLogConfiguration conf) throws Exception { LogSegmentEntryStore store = new BKLogSegmentEntryStore( - conf, bkc.get(), scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); + conf, bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL); return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId)); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java index 645b666..20c81f3 100644 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java +++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java @@ -95,7 +95,7 @@ public class DistributedLogInputFormat final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null); for (LogSegmentMetadata segment : segments) { final CountDownLatch latch = new CountDownLatch(1); - lm.readLedgerMetadata(segment.getLedgerId(), + lm.readLedgerMetadata(segment.getLogSegmentId(), new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() { @Override public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java index 6dfc6aa..f8b98f7 100644 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java +++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java @@ -56,7 +56,7 @@ class LogSegmentReader extends RecordReader<DLSN, LogRecordWithDLSN> { this.metadata = split.getMetadata(); try { this.lh = bk.openLedgerNoRecovery( - split.getLedgerId(), + split.getLogSegmentId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); } catch (BKException e) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java index 58f3e9d..89e9d44 100644 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java +++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java @@ -53,8 +53,8 @@ public class LogSegmentSplit extends InputSplit implements Writable { return logSegmentMetadata; } - public long getLedgerId() { - return logSegmentMetadata.getLedgerId(); + public long getLogSegmentId() { + return logSegmentMetadata.getLogSegmentId(); } @Override