This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d68829c Add properties for managed-ledger. (#7054) d68829c is described below commit d68829c00101649f41ab78cb808ce7da02bbd9ec Author: lipenghui <peng...@apache.org> AuthorDate: Wed May 27 20:59:16 2020 +0800 Add properties for managed-ledger. (#7054) ### Motivation Add properties for managed-ledger. We can use properties to store some metadata such as some metadata that protocol handler need to persist. ### Verifying this change New unit tests added --- .../apache/bookkeeper/mledger/AsyncCallbacks.java | 7 ++ .../apache/bookkeeper/mledger/ManagedLedger.java | 24 ++++++ .../bookkeeper/mledger/ManagedLedgerInfo.java | 2 + .../mledger/impl/ManagedLedgerFactoryImpl.java | 8 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++ .../bookkeeper/mledger/impl/MetaStoreImpl.java | 1 + managed-ledger/src/main/proto/MLDataFormats.proto | 2 + .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 93 ++++++++++++++++++++++ 8 files changed, 210 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 8a21385..0add10f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -137,4 +138,10 @@ public interface AsyncCallbacks { void offloadFailed(ManagedLedgerException exception, Object ctx); } + + interface SetPropertiesCallback { + void setPropertiesComplete(Map<String, String> properties, Object ctx); + + void setPropertiesFailed(ManagedLedgerException exception, Object ctx); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index d1174f6..714dd0e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -444,6 +444,30 @@ public interface ManagedLedger { void readyToCreateNewLedger(); /** + * Returns managed-ledger's properties. + * + * @return key-values of properties + */ + Map<String, String> getProperties(); + + /** + * Update managed-ledger's properties. + * + * @param properties key-values of properties + */ + void setProperties(Map<String, String> properties) throws InterruptedException; + + /** + * Async update managed-ledger's properties. + * + * @param properties key-values of properties. + * @param callback a callback which will be supplied with the newest properties in managedLedger. + * @param ctx a context object which will be passed to the callback on completion. + */ + void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback, + Object ctx); + + /** * Trim consumed ledgers in background * @param promise */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java index c03da89..2afd119 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java @@ -33,6 +33,8 @@ public class ManagedLedgerInfo { public Map<String, CursorInfo> cursors; + public Map<String, String> properties; + public static class LedgerInfo { public long ledgerId; public Long entries; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 66ad3ba..bf581e7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -519,6 +519,14 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { info.terminatedPosition.entryId = pbInfo.getTerminatedPosition().getEntryId(); } + if (pbInfo.getPropertiesCount() > 0) { + info.properties = Maps.newTreeMap(); + for (int i = 0; i < pbInfo.getPropertiesCount(); i++) { + MLDataFormats.KeyValue property = pbInfo.getProperties(i); + info.properties.put(property.getKey(), property.getValue()); + } + } + for (int i = 0; i < pbInfo.getLedgerInfoCount(); i++) { MLDataFormats.ManagedLedgerInfo.LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfo(i); LedgerInfo ledgerInfo = new LedgerInfo(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c7dc19d..e70b60c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -109,6 +110,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.offload.OffloadUtils; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; @@ -133,6 +135,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final BookKeeper.DigestType digestType; protected ManagedLedgerConfig config; + protected Map<String, String> propertiesMap; protected final MetaStore store; private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>( @@ -280,6 +283,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; + this.propertiesMap = Maps.newHashMap(); } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { @@ -300,6 +304,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ledgers.put(ls.getLedgerId(), ls); } + if (mlInfo.getPropertiesCount() > 0) { + propertiesMap = Maps.newHashMap(); + for (int i = 0; i < mlInfo.getPropertiesCount(); i++) { + MLDataFormats.KeyValue property = mlInfo.getProperties(i); + propertiesMap.put(property.getKey(), property.getValue()); + } + } + // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0) { final long id = ledgers.lastKey(); @@ -3174,6 +3186,67 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return offloadedSize; } + @Override + public Map<String, String> getProperties() { + return propertiesMap; + } + + @Override + public void setProperties(Map<String, String> properties) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + this.asyncSetProperties(properties, new SetPropertiesCallback() { + @Override + public void setPropertiesComplete(Map<String, String> properties, Object ctx) { + latch.countDown(); + } + + @Override + public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) { + log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage()); + latch.countDown(); + } + }, null); + + latch.await(); + } + + @Override + public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) { + store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() { + @Override + public void operationComplete(ManagedLedgerInfo result, Stat version) { + ledgersStat = version; + // Update manageLedger's properties. + ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result); + info.clearProperties(); + for (Map.Entry<String, String> property : properties.entrySet()) { + info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue())); + } + store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() { + @Override + public void operationComplete(Void result, Stat version) { + ledgersStat = version; + propertiesMap.clear(); + propertiesMap.putAll(properties); + callback.setPropertiesComplete(properties, ctx); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage()); + callback.setPropertiesFailed(e, ctx); + } + }); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage()); + callback.setPropertiesFailed(e, ctx); + } + }); + } + @VisibleForTesting public void setEntriesAddedCounter(long count) { ENTRIES_ADDED_COUNTER_UPDATER.set(this, count); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 50cf13a..caa21a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -253,6 +253,7 @@ public class MetaStoreImpl implements MetaStore { if (info.hasTerminatedPosition()) { mlInfo.setTerminatedPosition(info.getTerminatedPosition()); } + mlInfo.addAllProperties(info.getPropertiesList()); return mlInfo.build(); } diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 4dbd231..151cd69 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -56,6 +56,8 @@ message ManagedLedgerInfo { // committed entry. // No more entries can be written. optional NestedPositionInfo terminatedPosition = 2; + + repeated KeyValue properties = 3; } message PositionInfo { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a5e6408..2b02e0b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -47,6 +47,7 @@ import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -75,6 +76,7 @@ import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; @@ -97,6 +99,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; @@ -1170,6 +1173,52 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { } @Test + public void testSetProperties() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger"); + Map<String, String> properties = new HashMap<>(); + properties.put("key1", "value1"); + properties.put("key2", "value2"); + properties.put("key3", "value3"); + ledger.setProperties(properties); + assertEquals(ledger.getProperties(), properties); + + Map<String, String> newProperties = new HashMap<>(); + newProperties.put("key4", "value4"); + newProperties.put("key5", "value5"); + newProperties.put("key6", "value6"); + ledger.setProperties(newProperties); + assertEquals(ledger.getProperties(), newProperties); + } + + @Test + public void testAsyncSetProperties() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + ManagedLedger ledger = factory.open("my_test_ledger"); + Map<String, String> properties = new HashMap<>(); + properties.put("key1", "value1"); + properties.put("key2", "value2"); + properties.put("key3", "value3"); + ledger.setProperties(properties); + Map<String, String> newProperties = new HashMap<>(); + newProperties.put("key4", "value4"); + newProperties.put("key5", "value5"); + newProperties.put("key6", "value6"); + ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() { + @Override + public void setPropertiesComplete(Map<String, String> properties, Object ctx) { + latch.countDown(); + } + + @Override + public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) { + fail("should have succeeded"); + } + }, null); + latch.await(); + assertEquals(ledger.getProperties(), newProperties); + } + + @Test public void ledgersList() throws Exception { MetaStore store = factory.getMetaStore(); @@ -2522,6 +2571,50 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { return failed.getValue(); } + + @Test + public void testPropertiesForMeta() throws Exception { + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + + final String mLName = "properties_test"; + factory.open(mLName); + MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor); + + ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder(); + builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key1").setValue("value1").build()); + builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key2").setValue("value2").build()); + + CountDownLatch l2 = new CountDownLatch(1); + store.asyncUpdateLedgerIds(mLName, builder.build(), + new Stat(1, 0, 0), + new MetaStoreCallback<Void>() { + @Override + public void operationComplete(Void result, Stat version) { + l2.countDown(); + } + + @Override + public void operationFailed(MetaStoreException e) { + fail("on asyncUpdateLedgerIds"); + } + }); + + // get ManagedLedgerInfo from meta store + org.apache.bookkeeper.mledger.ManagedLedgerInfo managedLedgerInfo = factory.getManagedLedgerInfo(mLName); + Map<String, String> properties = managedLedgerInfo.properties; + assertEquals(properties.get("key1"), "value1"); + assertEquals(properties.get("key2"), "value2"); + + factory.shutdown(); + factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + + // reopen managedLedger + ManagedLedger ml = factory.open(mLName); + properties = ml.getProperties(); + assertEquals(properties.get("key1"), "value1"); + assertEquals(properties.get("key2"), "value2"); + } + private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject<ManagedLedger> ledger1, MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception { CountDownLatch latch = new CountDownLatch(1);