This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 74da307  RATIS-541 Add option to verify correctness of batch and mix 
of batch and single updates data from LogService in VerficiationTool(Rajeshbabu)
74da307 is described below

commit 74da30734d08b49946e84779b7485427a33620d7
Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla>
AuthorDate: Wed Jul 17 17:42:42 2019 +0530

    RATIS-541 Add option to verify correctness of batch and mix of batch and 
single updates data from LogService in VerficiationTool(Rajeshbabu)
---
 .../ratis/logservice/tool/VerificationTool.java    | 126 ++++++++++++++++++---
 1 file changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
index 1adbcdb..aff80f1 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/tool/VerificationTool.java
@@ -35,33 +35,55 @@ import org.apache.ratis.logservice.api.LogReader;
 import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.server.LogStateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.IParameterValidator;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.validators.PositiveInteger;
 
+/**
+ * LogService utility to write data and validate its presence.
+ */
 public class VerificationTool {
+    /**
+     * Validator to check that the provided value is positive and non-zero.
+     */
+    public static class NonZeroPositiveInteger implements IParameterValidator {
+      @Override
+      public void validate(String name, String value) throws 
ParameterException {
+        int i = Integer.parseInt(value);
+        if (i < 1) {
+          throw new ParameterException("Parameter '" + name + "' must be 
positive and non-zero"
+              + ", but was " + value);
+        }
+      }
+    }
 
     public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
 
     @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service 
Quorum", required = true)
     private String metaQuorum;
-    @Parameter(names = {"-nl", "--numLogs"}, description = "Number of logs")
+    @Parameter(names = {"-nl", "--numLogs"}, description = "Number of logs", 
validateWith = NonZeroPositiveInteger.class)
     private int numLogs = 10;
-    @Parameter(names = {"-nr", "--numRecords"}, description = "Number of 
records per log")
+    @Parameter(names = {"-nr", "--numRecords"}, description = "Number of 
records to write per log", validateWith = NonZeroPositiveInteger.class)
     private int numRecords = 1000;
     @Parameter(names = {"-w", "--write"}, description = "Write to the logs", 
arity = 1)
     private boolean write = true;
     @Parameter(names = {"-r", "--read"}, description = "Read the logs", arity 
= 1)
     private boolean read = true;
-    @Parameter(names = {"-l", "--logFrequency"}, description = "Print update 
every N operations")
+    @Parameter(names = {"-l", "--logFrequency"}, description = "Print update 
every N operations", validateWith = NonZeroPositiveInteger.class)
     private int logFrequency = 50;
     @Parameter(names = {"-h", "--help"}, description = "Help", help = true)
     private boolean help = false;
     @Parameter(names = {"-s", "--size"}, description = "Size in bytes of each 
value")
     private int recordSize = -1;
+    @Parameter(names = {"-bs", "--batchSize"}, description = "Number of 
records in a batch, a value of 0 disables batching", validateWith = 
PositiveInteger.class)
+    private int batchSize = 0;
 
     public static final String LOG_NAME_PREFIX = "testlog";
     public static final String MESSAGE_PREFIX = "message";
@@ -95,11 +117,27 @@ public class VerificationTool {
           for (int i = 0; i < tool.numLogs; i++) {
               LogName logName = getLogName(i);
               if (logsInSystem.contains(logName)) {
-                LOG.info("Deleting {}", logName);
-                client.deleteLog(logName);
+                  LOG.info("Deleting {}", logName);
+                  client.deleteLog(logName);
+              }
+          }
+
+          // First write batch entries to log.
+          if(tool.batchSize > 0) {
+              // Compute the number of batches to write given the batch size.
+              int numBatches = tool.numRecords / tool.batchSize;
+              for (int i = 0; i < tool.numLogs; i++) {
+                  BatchWriter writer = new BatchWriter(getLogName(i), client, 
tool.numRecords,
+                          tool.logFrequency, tool.recordSize, tool.batchSize, 
numBatches);
+                  futures.add(executor.submit(writer));
+              }
+          } else {
+              // Write single entries to log.
+              for (int i = 0; i < tool.numLogs; i++) {
+                  BulkWriter writer = new BulkWriter(getLogName(i), client, 
tool.numRecords,
+                          tool.logFrequency, tool.recordSize);
+                  futures.add(executor.submit(writer));
               }
-              BulkWriter writer = new BulkWriter(getLogName(i), client, 
tool.numRecords, tool.logFrequency, tool.recordSize);
-              futures.add(executor.submit(writer));
           }
           waitForCompletion(futures);
         }
@@ -108,7 +146,8 @@ public class VerificationTool {
           LOG.info("Executing parallel reads");
           futures = new ArrayList<Future<?>>(tool.numLogs);
           for (int i = 0; i < tool.numLogs; i++) {
-              BulkReader reader = new BulkReader(getLogName(i), client, 
tool.numRecords, tool.logFrequency, tool.recordSize);
+              BulkReader reader = new BulkReader(getLogName(i), client, 
tool.numRecords, tool.logFrequency,
+                      tool.recordSize);
               futures.add(executor.submit(reader));
           }
           waitForCompletion(futures);
@@ -211,18 +250,28 @@ public class VerificationTool {
 
         return new String(value, 0, dividerOffset, StandardCharsets.UTF_8);
       }
+
+      LogWriter getLogWriter() throws IOException {
+          LogStream logStream = null;
+          try {
+              logStream = this.client.getLog(logName);
+          } catch (LogNotFoundException e) {
+              LOG.info("Creating {}", logName);
+              logStream = this.client.createLog(logName);
+          }
+          return logStream.createWriter();
+      }
     }
 
     static class BulkWriter extends Operation {
-        BulkWriter(LogName logName, LogServiceClient client, int numRecords, 
int logFreq, int valueSize) {
+        BulkWriter(LogName logName, LogServiceClient client, int numRecords, 
int logFreq,
+                int valueSize) {
             super(logName, client, numRecords, logFreq, valueSize);
         }
 
         public void run() {
             try {
-                LOG.info("Creating {}", logName);
-                LogStream logStream = this.client.createLog(logName);
-                LogWriter writer = logStream.createWriter();
+                LogWriter writer = getLogWriter();
                 for (int i = 0; i < this.numRecords; i++) {
                     String message = MESSAGE_PREFIX + i;
                     if (i % logFreq == 0) {
@@ -231,7 +280,58 @@ public class VerificationTool {
                     writer.write(createValue(message));
                 }
                 writer.close();
-                LOG.info("{} log entries written to log {} successfully.", 
numRecords, logName);
+                LOG.info("{} entries written to {} successfully.", numRecords, 
logName);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    static class BatchWriter extends Operation {
+        private int batchSize;
+        private int numBatches;
+        BatchWriter(LogName logName, LogServiceClient client, int numRecords, 
int logFreq,
+                int valueSize, int batchSize, int numBatches) {
+            super(logName, client, numRecords, logFreq, valueSize);
+            this.batchSize = batchSize;
+            this.numBatches = numBatches;
+        }
+
+        public void run() {
+            try {
+                LogWriter writer = getLogWriter();
+                for(int i = 0; i < numBatches; i++) {
+                    List<ByteBuffer> messages = new 
ArrayList<ByteBuffer>(batchSize);
+                    for(int j = 0; j < batchSize; j++) {
+                        String message = MESSAGE_PREFIX + (i * batchSize + j);
+                        messages.add(createValue(message));
+                        if((i * batchSize + j) % logFreq == 0) {
+                            LOG.info(logName + " batching write " + message);
+                        }
+                    }
+                    try {
+                        writer.write(messages);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                // Catch the last bit that didn't evenly fit into the batch 
sizes
+                if (numRecords % batchSize != 0) {
+                  List<ByteBuffer> lastBatch = new ArrayList<>();
+                  for (int i = numBatches * batchSize; i < numRecords; i++) {
+                    String message = MESSAGE_PREFIX + i;
+                    lastBatch.add(createValue(message));
+                  }
+                  LOG.info(logName + " writing last mini-batch of " + 
lastBatch.size() + " records");
+                  try {
+                    writer.write(lastBatch);
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                }
+                LOG.info("{} entries written in batches to {} successfully.",
+                        numRecords, logName);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

Reply via email to