This is an automated email from the ASF dual-hosted git repository.
ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f49bd03 LedgerMetadata#parseConfig uses Metadata Builder
f49bd03 is described below
commit f49bd035db01f5b50df88c6c43329f7824984181
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Nov 22 02:42:45 2018 +0100
LedgerMetadata#parseConfig uses Metadata Builder
parseConfig is one of the main places that mutates member fields of
LedgerMetadata. This change removes these mutates to make it use
LedgerMetadataBuilder instead.
Master issue: #281
Reviewers: Sijie Guo <[email protected]>
This closes #1825 from ivankelly/parse-builder
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 4 +-
.../apache/bookkeeper/client/LedgerMetadata.java | 174 ++++++++++++---------
.../bookkeeper/client/LedgerMetadataBuilder.java | 43 +++--
.../bookkeeper/meta/AbstractZkLedgerManager.java | 2 +-
.../bookkeeper/meta/MSLedgerManagerFactory.java | 4 +-
.../bookkeeper/client/MetadataUpdateLoopTest.java | 9 +-
.../client/ParallelLedgerRecoveryTest.java | 3 +-
.../meta/AbstractZkLedgerManagerTest.java | 24 ++-
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 13 +-
.../apache/bookkeeper/meta/MockLedgerManager.java | 5 +-
.../metadata/etcd/EtcdLedgerManager.java | 6 +-
.../metadata/etcd/EtcdLedgerManagerTest.java | 16 +-
12 files changed, 186 insertions(+), 117 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index a4a437c..12d85e2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -25,7 +25,6 @@ import static
org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistra
import static
org.apache.bookkeeper.tools.cli.helpers.CommandHelpers.getBookieSocketAddrStringRepresentation;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -63,6 +62,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -1143,7 +1143,7 @@ public class BookieShell implements Tool {
} else if (cmdLine.hasOption("restorefromfile")) {
byte[] serialized = Files.readAllBytes(
FileSystems.getDefault().getPath(cmdLine.getOptionValue("restorefromfile")));
- LedgerMetadata md =
LedgerMetadata.parseConfig(serialized, Optional.absent());
+ LedgerMetadata md =
LedgerMetadata.parseConfig(serialized, Optional.empty());
m.createLedgerMetadata(lid, md).join();
} else {
printLedgerMetadata(lid,
m.readLedgerMetadata(lid).get().getValue(), true);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 620a798..90f0f13 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -22,7 +22,6 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
@@ -38,9 +37,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -72,14 +73,13 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
public static final String VERSION_KEY = "BookieMetadataFormatVersion";
private int metadataFormatVersion = 0;
-
private int ensembleSize;
private int writeQuorumSize;
private int ackQuorumSize;
private long length;
private long lastEntryId;
private long ctime;
- boolean storeSystemtimeAsLedgerCreationTime = false;
+ boolean storeCtime; // non-private so builder can access for copy
private LedgerMetadataFormat.State state;
private TreeMap<Long, ImmutableList<BookieSocketAddress>> ensembles = new
TreeMap<>();
@@ -103,12 +103,13 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
this.ackQuorumSize = ackQuorumSize;
if (storeSystemtimeAsLedgerCreationTime) {
this.ctime = System.currentTimeMillis();
+ this.storeCtime = storeSystemtimeAsLedgerCreationTime;
} else {
// if client disables storing its system time as ledger creation
time, there should be no ctime at this
// moment.
this.ctime = -1L;
+ this.storeCtime = false;
}
- this.storeSystemtimeAsLedgerCreationTime =
storeSystemtimeAsLedgerCreationTime;
/*
* It is set in PendingReadOp.readEntryComplete, and
@@ -128,19 +129,22 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
}
}
- LedgerMetadata(int ensembleSize,
+ LedgerMetadata(int metadataFormatVersion,
+ int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
LedgerMetadataFormat.State state,
- java.util.Optional<Long> lastEntryId,
- java.util.Optional<Long> length,
+ Optional<Long> lastEntryId,
+ Optional<Long> length,
Map<Long, List<BookieSocketAddress>> ensembles,
DigestType digestType,
- java.util.Optional<byte[]> password,
- java.util.Optional<Long> ctime,
+ Optional<byte[]> password,
+ long ctime,
+ boolean storeCtime,
Map<String, byte[]> customMetadata) {
checkArgument(ensembles.size() > 0, "There must be at least one
ensemble in the ledger");
+ this.metadataFormatVersion = metadataFormatVersion;
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
@@ -165,10 +169,8 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
this.hasPassword = true;
});
- ctime.ifPresent((c) -> {
- this.ctime = c;
- this.storeSystemtimeAsLedgerCreationTime = true;
- });
+ this.ctime = ctime;
+ this.storeCtime = storeCtime;
this.customMetadata.putAll(customMetadata);
}
@@ -196,7 +198,7 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
this.hasPassword = other.hasPassword;
this.digestType = other.digestType;
this.ctime = other.ctime;
- this.storeSystemtimeAsLedgerCreationTime =
other.storeSystemtimeAsLedgerCreationTime;
+ this.storeCtime = other.storeCtime;
this.password = new byte[other.password.length];
System.arraycopy(other.password, 0, this.password, 0,
other.password.length);
// copy the ensembles
@@ -243,11 +245,6 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
return ctime;
}
- @VisibleForTesting
- void setCtime(long ctime) {
- this.ctime = ctime;
- }
-
/**
* In versions 4.1.0 and below, the digest type and password were not
* stored in the metadata.
@@ -260,7 +257,11 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
@VisibleForTesting
public byte[] getPassword() {
- return Arrays.copyOf(password, password.length);
+ if (!hasPassword()) {
+ return new byte[0];
+ } else {
+ return Arrays.copyOf(password, password.length);
+ }
}
@Override
@@ -382,7 +383,7 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
.setEnsembleSize(ensembleSize).setLength(length)
.setState(state).setLastEntryId(lastEntryId);
- if (storeSystemtimeAsLedgerCreationTime) {
+ if (storeCtime) {
builder.setCtime(ctime);
}
@@ -467,15 +468,14 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
*
* @param bytes
* byte array to parse
- * @param msCtime
+ * @param metadataStoreCtime
* metadata store creation time, used for legacy ledgers
* @return LedgerConfig
* @throws IOException
* if the given byte[] cannot be parsed
*/
- public static LedgerMetadata parseConfig(byte[] bytes, Optional<Long>
msCtime) throws IOException {
- LedgerMetadata lc = new LedgerMetadata();
-
+ public static LedgerMetadata parseConfig(byte[] bytes,
+ Optional<Long>
metadataStoreCtime) throws IOException {
String config = new String(bytes, UTF_8);
if (LOG.isDebugEnabled()) {
@@ -486,89 +486,94 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
if (versionLine == null) {
throw new IOException("Invalid metadata. Content missing");
}
+ final int metadataFormatVersion;
if (versionLine.startsWith(VERSION_KEY)) {
String parts[] = versionLine.split(tSplitter);
- lc.metadataFormatVersion = Integer.parseInt(parts[1]);
+ metadataFormatVersion = Integer.parseInt(parts[1]);
} else {
// if no version is set, take it to be version 1
// as the parsing is the same as what we had before
// we introduce versions
- lc.metadataFormatVersion = 1;
+ metadataFormatVersion = 1;
// reset the reader
reader.close();
reader = new BufferedReader(new StringReader(config));
}
- if (lc.metadataFormatVersion < LOWEST_COMPAT_METADATA_FORMAT_VERSION
- || lc.metadataFormatVersion > CURRENT_METADATA_FORMAT_VERSION) {
- throw new IOException("Metadata version not compatible. Expected
between "
- + LOWEST_COMPAT_METADATA_FORMAT_VERSION + " and " +
CURRENT_METADATA_FORMAT_VERSION
- + ", but got " + lc.metadataFormatVersion);
+ if (metadataFormatVersion < LOWEST_COMPAT_METADATA_FORMAT_VERSION
+ || metadataFormatVersion > CURRENT_METADATA_FORMAT_VERSION) {
+ throw new IOException(
+ String.format("Metadata version not compatible. Expected
between %d and %d, but got %d",
+ LOWEST_COMPAT_METADATA_FORMAT_VERSION,
CURRENT_METADATA_FORMAT_VERSION,
+ metadataFormatVersion));
}
- if (lc.metadataFormatVersion == 1) {
- return parseVersion1Config(lc, reader);
+ if (metadataFormatVersion == 1) {
+ return parseVersion1Config(reader);
}
+ LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+ .withMetadataFormatVersion(metadataFormatVersion);
+
// remaining size is total minus the length of the version line and
'\n'
char[] configBuffer = new char[config.length() - (versionLine.length()
+ 1)];
if (configBuffer.length != reader.read(configBuffer, 0,
configBuffer.length)) {
throw new IOException("Invalid metadata buffer");
}
- LedgerMetadataFormat.Builder builder =
LedgerMetadataFormat.newBuilder();
+ LedgerMetadataFormat.Builder formatBuilder =
LedgerMetadataFormat.newBuilder();
+ TextFormat.merge((CharSequence) CharBuffer.wrap(configBuffer),
formatBuilder);
+ LedgerMetadataFormat data = formatBuilder.build();
- TextFormat.merge((CharSequence) CharBuffer.wrap(configBuffer),
builder);
- LedgerMetadataFormat data = builder.build();
- lc.writeQuorumSize = data.getQuorumSize();
- if (data.hasCtime()) {
- lc.ctime = data.getCtime();
- lc.storeSystemtimeAsLedgerCreationTime = true;
- } else if (msCtime.isPresent()) {
- lc.ctime = msCtime.get();
- lc.storeSystemtimeAsLedgerCreationTime = false;
- }
+ builder.withEnsembleSize(data.getEnsembleSize());
+ builder.withWriteQuorumSize(data.getQuorumSize());
if (data.hasAckQuorumSize()) {
- lc.ackQuorumSize = data.getAckQuorumSize();
+ builder.withAckQuorumSize(data.getAckQuorumSize());
} else {
- lc.ackQuorumSize = lc.writeQuorumSize;
+ builder.withAckQuorumSize(data.getQuorumSize());
+ }
+
+ if (data.hasCtime()) {
+
builder.withCreationTime(data.getCtime()).storingCreationTime(true);
+ } else if (metadataStoreCtime.isPresent()) {
+
builder.withCreationTime(metadataStoreCtime.get()).storingCreationTime(false);
}
- lc.ensembleSize = data.getEnsembleSize();
- lc.length = data.getLength();
- lc.state = data.getState();
- lc.lastEntryId = data.getLastEntryId();
+ if (data.getState() == LedgerMetadataFormat.State.IN_RECOVERY) {
+ builder.withInRecoveryState();
+ } else if (data.getState() == LedgerMetadataFormat.State.CLOSED) {
+ builder.closingAt(data.getLastEntryId(), data.getLength());
+ }
if (data.hasPassword()) {
- lc.digestType = data.getDigestType();
- lc.password = data.getPassword().toByteArray();
- lc.hasPassword = true;
+ builder.withPassword(data.getPassword().toByteArray())
+ .withDigestType(protoToApiDigestType(data.getDigestType()));
}
for (LedgerMetadataFormat.Segment s : data.getSegmentList()) {
- ArrayList<BookieSocketAddress> addrs = new
ArrayList<BookieSocketAddress>();
- for (String member : s.getEnsembleMemberList()) {
- addrs.add(new BookieSocketAddress(member));
+ List<BookieSocketAddress> addrs = new ArrayList<>();
+ for (String addr : s.getEnsembleMemberList()) {
+ addrs.add(new BookieSocketAddress(addr));
}
- lc.addEnsemble(s.getFirstEntryId(), addrs);
+ builder.newEnsembleEntry(s.getFirstEntryId(), addrs);
}
if (data.getCustomMetadataCount() > 0) {
- List<LedgerMetadataFormat.cMetadataMapEntry> cMetadataList =
data.getCustomMetadataList();
- lc.customMetadata = Maps.newHashMap();
- for (LedgerMetadataFormat.cMetadataMapEntry ent : cMetadataList) {
- lc.customMetadata.put(ent.getKey(),
ent.getValue().toByteArray());
- }
+
builder.withCustomMetadata(data.getCustomMetadataList().stream().collect(
+ Collectors.toMap(e ->
e.getKey(),
+ e ->
e.getValue().toByteArray())));
}
- return lc;
+ return builder.build();
}
- static LedgerMetadata parseVersion1Config(LedgerMetadata lc,
- BufferedReader reader) throws
IOException {
+ static LedgerMetadata parseVersion1Config(BufferedReader reader) throws
IOException {
+ LedgerMetadataBuilder builder =
LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
try {
- lc.writeQuorumSize = lc.ackQuorumSize =
Integer.parseInt(reader.readLine());
- lc.ensembleSize = Integer.parseInt(reader.readLine());
- lc.length = Long.parseLong(reader.readLine());
+ int quorumSize = Integer.parseInt(reader.readLine());
+ int ensembleSize = Integer.parseInt(reader.readLine());
+ long length = Long.parseLong(reader.readLine());
+
+
builder.withEnsembleSize(ensembleSize).withWriteQuorumSize(quorumSize).withAckQuorumSize(quorumSize);
String line = reader.readLine();
while (line != null) {
@@ -577,27 +582,25 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
if (parts[1].equals(closed)) {
Long l = Long.parseLong(parts[0]);
if (l == IN_RECOVERY) {
- lc.state = LedgerMetadataFormat.State.IN_RECOVERY;
+ builder.withInRecoveryState();
} else {
- lc.state = LedgerMetadataFormat.State.CLOSED;
- lc.lastEntryId = l;
+ builder.closingAt(l, length);
}
break;
- } else {
- lc.state = LedgerMetadataFormat.State.OPEN;
}
ArrayList<BookieSocketAddress> addrs = new
ArrayList<BookieSocketAddress>();
for (int j = 1; j < parts.length; j++) {
addrs.add(new BookieSocketAddress(parts[j]));
}
- lc.addEnsemble(Long.parseLong(parts[0]), addrs);
+ builder.newEnsembleEntry(Long.parseLong(parts[0]), addrs);
+
line = reader.readLine();
}
+ return builder.build();
} catch (NumberFormatException e) {
throw new IOException(e);
}
- return lc;
}
/**
@@ -691,4 +694,23 @@ public class LedgerMetadata implements
org.apache.bookkeeper.client.api.LedgerMe
checkState(!ensembles.isEmpty(), "Metadata should never be created
with no ensembles");
return ensembles.lastKey();
}
+
+ int getMetadataFormatVersion() {
+ return metadataFormatVersion;
+ }
+
+ private static DigestType
protoToApiDigestType(LedgerMetadataFormat.DigestType digestType) {
+ switch (digestType) {
+ case HMAC:
+ return DigestType.MAC;
+ case CRC32:
+ return DigestType.CRC32;
+ case CRC32C:
+ return DigestType.CRC32C;
+ case DUMMY:
+ return DigestType.DUMMY;
+ default:
+ throw new IllegalArgumentException("Unable to convert digest type
" + digestType);
+ }
+ }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 8206438..74ec717 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -43,6 +43,7 @@ import
org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
@Unstable
@VisibleForTesting
public class LedgerMetadataBuilder {
+ private int metadataFormatVersion =
LedgerMetadata.CURRENT_METADATA_FORMAT_VERSION;
private int ensembleSize = 3;
private int writeQuorumSize = 3;
private int ackQuorumSize = 2;
@@ -56,7 +57,8 @@ public class LedgerMetadataBuilder {
private DigestType digestType = DigestType.CRC32C;
private Optional<byte[]> password = Optional.empty();
- private Optional<Long> ctime = Optional.empty();
+ private long ctime = -1;
+ private boolean storeCtime = false;
private Map<String, byte[]> customMetadata = Collections.emptyMap();
public static LedgerMetadataBuilder create() {
@@ -65,6 +67,7 @@ public class LedgerMetadataBuilder {
public static LedgerMetadataBuilder from(LedgerMetadata other) {
LedgerMetadataBuilder builder = new LedgerMetadataBuilder();
+ builder.metadataFormatVersion = other.getMetadataFormatVersion();
builder.ensembleSize = other.getEnsembleSize();
builder.writeQuorumSize = other.getWriteQuorumSize();
builder.ackQuorumSize = other.getAckQuorumSize();
@@ -87,14 +90,19 @@ public class LedgerMetadataBuilder {
builder.password = Optional.of(other.getPassword());
}
- if (other.storeSystemtimeAsLedgerCreationTime) {
- builder.ctime = Optional.of(other.getCtime());
- }
+ builder.ctime = other.getCtime();
+ builder.storeCtime = other.storeCtime;
+
builder.customMetadata =
ImmutableMap.copyOf(other.getCustomMetadata());
return builder;
}
+ public LedgerMetadataBuilder withMetadataFormatVersion(int version) {
+ this.metadataFormatVersion = version;
+ return this;
+ }
+
public LedgerMetadataBuilder withPassword(byte[] password) {
this.password = Optional.of(Arrays.copyOf(password, password.length));
return this;
@@ -112,14 +120,11 @@ public class LedgerMetadataBuilder {
}
public LedgerMetadataBuilder withWriteQuorumSize(int writeQuorumSize) {
- checkArgument(ensembleSize >= writeQuorumSize, "Write quorum must be
less or equal to ensemble size");
- checkArgument(writeQuorumSize >= ackQuorumSize, "Write quorum must be
greater or equal to ack quorum");
this.writeQuorumSize = writeQuorumSize;
return this;
}
public LedgerMetadataBuilder withAckQuorumSize(int ackQuorumSize) {
- checkArgument(writeQuorumSize >= ackQuorumSize, "Ack quorum must be
less or equal to write quorum");
this.ackQuorumSize = ackQuorumSize;
return this;
}
@@ -154,10 +159,30 @@ public class LedgerMetadataBuilder {
return this;
}
+ public LedgerMetadataBuilder withCustomMetadata(Map<String, byte[]>
customMetadata) {
+ this.customMetadata = ImmutableMap.copyOf(customMetadata);
+ return this;
+ }
+
+ public LedgerMetadataBuilder withCreationTime(long ctime) {
+ this.ctime = ctime;
+ return this;
+ }
+
+ public LedgerMetadataBuilder storingCreationTime(boolean storing) {
+ this.storeCtime = storing;
+ return this;
+ }
+
public LedgerMetadata build() {
- return new LedgerMetadata(ensembleSize, writeQuorumSize, ackQuorumSize,
+ checkArgument(ensembleSize >= writeQuorumSize, "Write quorum must be
less or equal to ensemble size");
+ checkArgument(writeQuorumSize >= ackQuorumSize, "Write quorum must be
greater or equal to ack quorum");
+
+ return new LedgerMetadata(metadataFormatVersion,
+ ensembleSize, writeQuorumSize, ackQuorumSize,
state, lastEntryId, length, ensembles,
- digestType, password, ctime, customMetadata);
+ digestType, password, ctime, storeCtime,
+ customMetadata);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index b5bea64..306cf50 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -18,7 +18,6 @@
package org.apache.bookkeeper.meta;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -26,6 +25,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 694ae64..35658dd 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -21,12 +21,12 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -441,7 +441,7 @@ public class MSLedgerManagerFactory extends
AbstractZkLedgerManagerFactory {
}
try {
LedgerMetadata metadata = LedgerMetadata.parseConfig(
- value.getValue().getField(META_FIELD),
Optional.<Long>absent());
+ value.getValue().getField(META_FIELD),
Optional.empty());
promise.complete(new Versioned<>(metadata,
value.getVersion()));
} catch (IOException e) {
LOG.error("Could not parse ledger metadata for ledger
" + ledgerId + " : ", e);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index 2347685..843ec9b 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -114,7 +114,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+ .withWriteQuorumSize(2).newEnsembleEntry(0L,
Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
@@ -180,8 +180,8 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
- Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
+ .withWriteQuorumSize(2).newEnsembleEntry(0L,
Lists.newArrayList(b0, b1)).build();
+ Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new
AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop1 = new
MetadataUpdateLoop(
@@ -233,7 +233,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(2)
- .newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
+ .withWriteQuorumSize(2).newEnsembleEntry(0L,
Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new
AtomicReference<>(writtenMetadata);
@@ -341,6 +341,7 @@ public class MetadataUpdateLoopTest {
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
LedgerMetadata initMeta =
LedgerMetadataBuilder.create().withEnsembleSize(1)
+ .withWriteQuorumSize(1).withAckQuorumSize(1)
.newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 9682eb6..9571d1e 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -673,7 +673,8 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
final AtomicInteger rcHolder = new AtomicInteger(-1234);
final CountDownLatch doneLatch = new CountDownLatch(1);
- new ReadLastConfirmedOp(readLh, bkc.getBookieClient(),
readLh.getCurrentEnsemble(),
+ new ReadLastConfirmedOp(readLh, bkc.getBookieClient(),
+
readLh.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc,
DigestManager.RecoveryData data) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 71b229d..fb56385 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -39,8 +39,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
+import com.google.common.collect.Lists;
import java.time.Duration;
-import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -49,11 +50,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
@@ -111,12 +113,18 @@ public class AbstractZkLedgerManagerTest extends
MockZooKeeperTestCase {
withSettings()
.useConstructor(conf, mockZk)
.defaultAnswer(CALLS_REAL_METHODS));
- this.metadata = new LedgerMetadata(
- 5, 3, 3,
- DigestType.CRC32,
- new byte[0],
- Collections.emptyMap(),
- false);
+ List<BookieSocketAddress> ensemble = Lists.newArrayList(
+ new BookieSocketAddress("192.0.2.1", 3181),
+ new BookieSocketAddress("192.0.2.2", 3181),
+ new BookieSocketAddress("192.0.2.3", 3181),
+ new BookieSocketAddress("192.0.2.4", 3181),
+ new BookieSocketAddress("192.0.2.5", 3181));
+ this.metadata = LedgerMetadataBuilder.create()
+ .withEnsembleSize(5)
+ .withWriteQuorumSize(3)
+ .withAckQuorumSize(3)
+ .newEnsembleEntry(0L, ensemble)
+ .withCreationTime(12345L).build();
doAnswer(invocationOnMock -> {
long ledgerId = invocationOnMock.getArgument(0);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index baf3d00..182ffa1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
@@ -62,12 +63,13 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.bookie.StateManager;
import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -93,6 +95,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
*/
private void createLedgers(int numLedgers, final Set<Long> createdLedgers)
throws IOException {
final AtomicInteger expected = new AtomicInteger(numLedgers);
+ List<BookieSocketAddress> ensemble = Lists.newArrayList(new
BookieSocketAddress("192.0.2.1", 1234));
+
for (int i = 0; i < numLedgers; i++) {
getLedgerIdGenerator().generateLedgerId(new
GenericCallback<Long>() {
@Override
@@ -107,8 +111,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
return;
}
- getLedgerManager().createLedgerMetadata(ledgerId,
- new
LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()))
+ LedgerMetadata md = LedgerMetadataBuilder.create()
+
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
+ .newEnsembleEntry(0L, ensemble).build();
+
+ getLedgerManager().createLedgerMetadata(ledgerId, md)
.whenComplete((result, exception) -> {
if (exception == null) {
activeLedgers.put(ledgerId, true);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index 1110407..a8b7e86 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -19,10 +19,9 @@
*/
package org.apache.bookkeeper.meta;
-import com.google.common.base.Optional;
-
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -83,7 +82,7 @@ public class MockLedgerManager implements LedgerManager {
if (pair == null) {
return null;
} else {
- return new Versioned<>(LedgerMetadata.parseConfig(pair.getRight(),
Optional.absent()), pair.getLeft());
+ return new Versioned<>(LedgerMetadata.parseConfig(pair.getRight(),
Optional.empty()), pair.getLeft());
}
}
diff --git
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index 8bd6684..8c183ab 100644
---
a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++
b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -31,10 +31,10 @@ import com.coreos.jetcd.op.CmpTarget;
import com.coreos.jetcd.options.DeleteOption;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.PutOption;
-import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -67,7 +67,7 @@ class EtcdLedgerManager implements LedgerManager {
try {
return LedgerMetadata.parseConfig(
bs.getBytes(),
- Optional.absent()
+ Optional.empty()
);
} catch (IOException ioe) {
log.error("Could not parse ledger metadata : {}",
bs.toStringUtf8(), ioe);
@@ -223,7 +223,7 @@ class EtcdLedgerManager implements LedgerManager {
KeyValue kv = getResp.getKvs().get(0);
byte[] data = kv.getValue().getBytes();
try {
- LedgerMetadata metadata =
LedgerMetadata.parseConfig(data, Optional.absent());
+ LedgerMetadata metadata =
LedgerMetadata.parseConfig(data, Optional.empty());
promise.complete(new Versioned<>(metadata, new
LongVersion(kv.getModRevision())));
} catch (IOException ioe) {
log.error("Could not parse ledger metadata for ledger
: {}", ledgerId, ioe);
diff --git
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
index 4e21483..da33165 100644
---
a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
+++
b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -95,11 +96,16 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
@Test
public void testLedgerCRUD() throws Exception {
long ledgerId = System.currentTimeMillis();
- LedgerMetadata metadata = new LedgerMetadata(
- 3, 3, 2,
- DigestType.CRC32C,
- "test-password".getBytes(UTF_8)
- );
+ List<BookieSocketAddress> ensemble = Lists.newArrayList(
+ new BookieSocketAddress("192.0.2.1", 1234),
+ new BookieSocketAddress("192.0.2.2", 1234),
+ new BookieSocketAddress("192.0.2.3", 1234));
+ LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
+ .withPassword("test-password".getBytes(UTF_8))
+ .withDigestType(DigestType.CRC32C.toApiDigestType())
+ .newEnsembleEntry(0L, ensemble)
+ .build();
// ledger doesn't exist: read
try {