This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 74da307 RATIS-541 Add option to verify correctness of batch and mix
of batch and single updates data from LogService in VerficiationTool(Rajeshbabu)
74da307 is described below
commit 74da30734d08b49946e84779b7485427a33620d7
Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla>
AuthorDate: Wed Jul 17 17:42:42 2019 +0530
RATIS-541 Add option to verify correctness of batch and mix of batch and
single updates data from LogService in VerficiationTool(Rajeshbabu)
---
.../ratis/logservice/tool/VerificationTool.java | 126 ++++++++++++++++++---
1 file changed, 113 insertions(+), 13 deletions(-)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
index 1adbcdb..aff80f1 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
@@ -35,33 +35,55 @@ import org.apache.ratis.logservice.api.LogReader;
import org.apache.ratis.logservice.api.LogStream;
import org.apache.ratis.logservice.api.LogWriter;
import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.common.LogNotFoundException;
import org.apache.ratis.logservice.server.LogStateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.validators.PositiveInteger;
+/**
+ * LogService utility to write data and validate its presence.
+ */
public class VerificationTool {
+ /**
+ * Validator to check that the provided value is positive and non-zero.
+ */
+ public static class NonZeroPositiveInteger implements IParameterValidator {
+ @Override
+ public void validate(String name, String value) throws
ParameterException {
+ int i = Integer.parseInt(value);
+ if (i < 1) {
+ throw new ParameterException("Parameter '" + name + "' must be
positive and non-zero"
+ + ", but was " + value);
+ }
+ }
+ }
public static final Logger LOG =
LoggerFactory.getLogger(LogStateMachine.class);
@Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service
Quorum", required = true)
private String metaQuorum;
- @Parameter(names = {"-nl", "--numLogs"}, description = "Number of logs")
+ @Parameter(names = {"-nl", "--numLogs"}, description = "Number of logs",
validateWith = NonZeroPositiveInteger.class)
private int numLogs = 10;
- @Parameter(names = {"-nr", "--numRecords"}, description = "Number of
records per log")
+ @Parameter(names = {"-nr", "--numRecords"}, description = "Number of
records to write per log", validateWith = NonZeroPositiveInteger.class)
private int numRecords = 1000;
@Parameter(names = {"-w", "--write"}, description = "Write to the logs",
arity = 1)
private boolean write = true;
@Parameter(names = {"-r", "--read"}, description = "Read the logs", arity
= 1)
private boolean read = true;
- @Parameter(names = {"-l", "--logFrequency"}, description = "Print update
every N operations")
+ @Parameter(names = {"-l", "--logFrequency"}, description = "Print update
every N operations", validateWith = NonZeroPositiveInteger.class)
private int logFrequency = 50;
@Parameter(names = {"-h", "--help"}, description = "Help", help = true)
private boolean help = false;
@Parameter(names = {"-s", "--size"}, description = "Size in bytes of each
value")
private int recordSize = -1;
+ @Parameter(names = {"-bs", "--batchSize"}, description = "Number of
records in a batch, a value of 0 disables batching", validateWith =
PositiveInteger.class)
+ private int batchSize = 0;
public static final String LOG_NAME_PREFIX = "testlog";
public static final String MESSAGE_PREFIX = "message";
@@ -95,11 +117,27 @@ public class VerificationTool {
for (int i = 0; i < tool.numLogs; i++) {
LogName logName = getLogName(i);
if (logsInSystem.contains(logName)) {
- LOG.info("Deleting {}", logName);
- client.deleteLog(logName);
+ LOG.info("Deleting {}", logName);
+ client.deleteLog(logName);
+ }
+ }
+
+ // First write batch entries to log.
+ if(tool.batchSize > 0) {
+ // Compute the number of batches to write given the batch size.
+ int numBatches = tool.numRecords / tool.batchSize;
+ for (int i = 0; i < tool.numLogs; i++) {
+ BatchWriter writer = new BatchWriter(getLogName(i), client,
tool.numRecords,
+ tool.logFrequency, tool.recordSize, tool.batchSize,
numBatches);
+ futures.add(executor.submit(writer));
+ }
+ } else {
+ // Write single entries to log.
+ for (int i = 0; i < tool.numLogs; i++) {
+ BulkWriter writer = new BulkWriter(getLogName(i), client,
tool.numRecords,
+ tool.logFrequency, tool.recordSize);
+ futures.add(executor.submit(writer));
}
- BulkWriter writer = new BulkWriter(getLogName(i), client,
tool.numRecords, tool.logFrequency, tool.recordSize);
- futures.add(executor.submit(writer));
}
waitForCompletion(futures);
}
@@ -108,7 +146,8 @@ public class VerificationTool {
LOG.info("Executing parallel reads");
futures = new ArrayList<Future<?>>(tool.numLogs);
for (int i = 0; i < tool.numLogs; i++) {
- BulkReader reader = new BulkReader(getLogName(i), client,
tool.numRecords, tool.logFrequency, tool.recordSize);
+ BulkReader reader = new BulkReader(getLogName(i), client,
tool.numRecords, tool.logFrequency,
+ tool.recordSize);
futures.add(executor.submit(reader));
}
waitForCompletion(futures);
@@ -211,18 +250,28 @@ public class VerificationTool {
return new String(value, 0, dividerOffset, StandardCharsets.UTF_8);
}
+
+ LogWriter getLogWriter() throws IOException {
+ LogStream logStream = null;
+ try {
+ logStream = this.client.getLog(logName);
+ } catch (LogNotFoundException e) {
+ LOG.info("Creating {}", logName);
+ logStream = this.client.createLog(logName);
+ }
+ return logStream.createWriter();
+ }
}
static class BulkWriter extends Operation {
- BulkWriter(LogName logName, LogServiceClient client, int numRecords,
int logFreq, int valueSize) {
+ BulkWriter(LogName logName, LogServiceClient client, int numRecords,
int logFreq,
+ int valueSize) {
super(logName, client, numRecords, logFreq, valueSize);
}
public void run() {
try {
- LOG.info("Creating {}", logName);
- LogStream logStream = this.client.createLog(logName);
- LogWriter writer = logStream.createWriter();
+ LogWriter writer = getLogWriter();
for (int i = 0; i < this.numRecords; i++) {
String message = MESSAGE_PREFIX + i;
if (i % logFreq == 0) {
@@ -231,7 +280,58 @@ public class VerificationTool {
writer.write(createValue(message));
}
writer.close();
- LOG.info("{} log entries written to log {} successfully.",
numRecords, logName);
+ LOG.info("{} entries written to {} successfully.", numRecords,
logName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class BatchWriter extends Operation {
+ private int batchSize;
+ private int numBatches;
+ BatchWriter(LogName logName, LogServiceClient client, int numRecords,
int logFreq,
+ int valueSize, int batchSize, int numBatches) {
+ super(logName, client, numRecords, logFreq, valueSize);
+ this.batchSize = batchSize;
+ this.numBatches = numBatches;
+ }
+
+ public void run() {
+ try {
+ LogWriter writer = getLogWriter();
+ for(int i = 0; i < numBatches; i++) {
+ List<ByteBuffer> messages = new
ArrayList<ByteBuffer>(batchSize);
+ for(int j = 0; j < batchSize; j++) {
+ String message = MESSAGE_PREFIX + (i * batchSize + j);
+ messages.add(createValue(message));
+ if((i * batchSize + j) % logFreq == 0) {
+ LOG.info(logName + " batching write " + message);
+ }
+ }
+ try {
+ writer.write(messages);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Catch the last bit that didn't evenly fit into the batch
sizes
+ if (numRecords % batchSize != 0) {
+ List<ByteBuffer> lastBatch = new ArrayList<>();
+ for (int i = numBatches * batchSize; i < numRecords; i++) {
+ String message = MESSAGE_PREFIX + i;
+ lastBatch.add(createValue(message));
+ }
+ LOG.info(logName + " writing last mini-batch of " +
lastBatch.size() + " records");
+ try {
+ writer.write(lastBatch);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOG.info("{} entries written in batches to {} successfully.",
+ numRecords, logName);
} catch (IOException e) {
throw new RuntimeException(e);
}