xichen01 commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1347691296


##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator 
iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();

Review Comment:
   Yes, the `System.out` should not be closed, have been modified



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +210,100 @@ private boolean displayTable(ManagedRocksIterator 
iterator,
                                boolean schemaV3)
       throws IOException {
 
-    if (fileName == null) {
-      // Print to stdout
-      return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
-    }
-
-    // Write to file output
-    try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
-      return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        printWriter = new PrintWriter(
+            new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+      } else {
+        printWriter = out();
+      }
+      return displayTable(iterator, dbColumnFamilyDef, printWriter,
+          schemaV3);
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
     }
   }
 
   private boolean displayTable(ManagedRocksIterator iterator,
                                DBColumnFamilyDefinition dbColumnFamilyDef,
-                               PrintWriter out,
-                               boolean schemaV3)
-      throws IOException {
+                               PrintWriter printWriter, boolean schemaV3) {
+    ThreadFactory factory = new ThreadFactoryBuilder()
+        .setNameFormat("DBScanner-%d")
+        .build();
+    ExecutorService threadPool = new ThreadPoolExecutor(
+        threadCount, threadCount, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(1024), factory,
+        new ThreadPoolExecutor.CallerRunsPolicy());
+    LogWriter logWriter = new LogWriter(printWriter);
+    try {
+      // Start JSON object (map) or array
+      printWriter.print(withKey ? "{ " : "[ ");
+      logWriter.start();
+      processRecords(iterator, dbColumnFamilyDef, logWriter,
+          threadPool, schemaV3);
+    } catch (InterruptedException e) {
+      exception = true;
+      Thread.currentThread().interrupt();
+    } finally {
+      threadPool.shutdownNow();
+      logWriter.stop();
+      logWriter.join();
+      // End JSON object (map) or array
+      printWriter.println(withKey ? " }" : " ]");
+    }
+    return !exception;
+  }
 
+  private void processRecords(ManagedRocksIterator iterator,
+                              DBColumnFamilyDefinition dbColumnFamilyDef,
+                              LogWriter logWriter, ExecutorService threadPool,
+                              boolean schemaV3) throws InterruptedException {
     if (startKey != null) {
       iterator.get().seek(getValueObject(dbColumnFamilyDef));
     }
-
-    if (withKey) {
-      // Start JSON object (map)
-      out.print("{ ");
-    } else {
-      // Start JSON array
-      out.print("[ ");
-    }
-
+    ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(batchSize);
+    // Used to ensure that the output of a multi-threaded parsed Json is in
+    // the same order as the RocksDB iterator.
+    long sequenceId = FIRST_SEQUENCE_ID;
     // Count number of keys printed so far
     long count = 0;
-    while (withinLimit(count) && iterator.get().isValid()) {
-      StringBuilder sb = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDef.getKeyCodec()
-            .fromPersistedFormat(iterator.get().key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        if (schemaV3) {
-          int index =
-              DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
-          String keyStr = key.toString();
-          if (index > keyStr.length()) {
-            err().println("Error: Invalid SchemaV3 table key length. "
-                + "Is this a V2 table? Try again with --dn-schema=V2");
-            return false;
-          }
-          String cid = keyStr.substring(0, index);
-          String blockId = keyStr.substring(index);
-          sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
-              FixedLengthStringCodec.string2Bytes(cid)) +
-              keySeparatorSchemaV3 +
-              blockId));
-        } else {
-          sb.append(gson.toJson(key));
-        }
-        sb.append(": ");
-      }
-
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      Object o = dbColumnFamilyDef.getValueCodec()
-          .fromPersistedFormat(iterator.get().value());
-      sb.append(gson.toJson(o));
-
+    List<Future<Void>> futures = new ArrayList<>();
+    while (withinLimit(count) && iterator.get().isValid() && !exception) {
+      batch.add(new ByteArrayKeyValue(
+          iterator.get().key(), iterator.get().value()));
       iterator.get().next();
-      ++count;
-      if (withinLimit(count) && iterator.get().isValid()) {
-        // If this is not the last entry, append comma
-        sb.append(", ");
+      count++;
+      if (batch.size() >= batchSize) {
+        while (logWriter.getInflightLogCount() > threadCount * 10L

Review Comment:
   There are two places here that will control the queue length.
   1. The `Task` queue is will be blocked by Threadpool's `LinkedBlockingQueue`.
   2. This `getInflightLogCount()` is used to control the number of 
`WriterTask`.
   
   It is also possible to use `InflightLogCount` for full link control, so we 
need to increase `InflightLogCount` here. and decrease `InflightLogCount` after 
`printWriter.println()`, how do you think?



##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +410,253 @@ private String removeTrailingSlashIfNeeded(String dbPath) 
{
   public Class<?> getParentType() {
     return RDBParser.class;
   }
+
+  /**
+   * Utility for centralized JSON serialization using Jackson.
+   */
+  @VisibleForTesting
+  public static class JsonSerializationHelper {
+    /**
+     * In order to maintain consistency with the original Gson output to do
+     * this setup makes the output from Jackson closely match the
+     * output of Gson.
+     */
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+        .registerModule(new JavaTimeModule())
+        .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+        // Ignore standard getters.
+        .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+        // Ignore boolean "is" getters.
+        .setVisibility(PropertyAccessor.IS_GETTER,
+            JsonAutoDetect.Visibility.NONE)
+        // Exclude null values.
+        .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    public static final ObjectWriter WRITER;
+
+    static {
+      if (compact) {
+        WRITER = OBJECT_MAPPER.writer();
+      } else {
+        WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+      }
+    }
+
+    public static ObjectWriter getWriter() {
+      return WRITER;
+    }
+  }
+
+
+  private static class Task implements Callable<Void> {
+
+    private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+    private final ArrayList<ByteArrayKeyValue> batch;
+    private final LogWriter logWriter;
+    private static final ObjectWriter WRITER =
+        JsonSerializationHelper.getWriter();
+    private final long sequenceId;
+    private final boolean withKey;
+    private final boolean schemaV3;
+
+    Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+         ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+         long sequenceId, boolean withKey, boolean schemaV3) {
+      this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+      this.batch = batch;
+      this.logWriter = logWriter;
+      this.sequenceId = sequenceId;
+      this.withKey = withKey;
+      this.schemaV3 = schemaV3;
+    }
+
+    @Override
+    public Void call() {
+      try {
+        ArrayList<String> results = new ArrayList<>(batch.size());
+        for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+          StringBuilder sb = new StringBuilder();
+          if (!(sequenceId == FIRST_SEQUENCE_ID && results.isEmpty())) {
+            // Add a comma before each output entry, starting from the second
+            // one, to ensure valid JSON format.
+            sb.append(", ");
+          }
+          if (withKey) {
+            Object key = dbColumnFamilyDefinition.getKeyCodec()
+                .fromPersistedFormat(byteArrayKeyValue.getKey());
+            if (schemaV3) {
+              int index =
+                  
DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
+              String keyStr = key.toString();
+              if (index > keyStr.length()) {
+                err().println("Error: Invalid SchemaV3 table key length. "
+                    + "Is this a V2 table? Try again with --dn-schema=V2");
+                exception = true;
+                break;
+              }
+              String cid = key.toString().substring(0, index);
+              String blockId = key.toString().substring(index);
+              sb.append(WRITER.writeValueAsString(LongCodec.get()
+                  .fromPersistedFormat(
+                      FixedLengthStringCodec.string2Bytes(cid)) +
+                  KEY_SEPARATOR_SCHEMA_V3 + blockId));
+            } else {
+              sb.append(WRITER.writeValueAsString(key));
+            }
+            sb.append(": ");
+          }
+
+          Object o = dbColumnFamilyDefinition.getValueCodec()
+              .fromPersistedFormat(byteArrayKeyValue.getValue());
+          sb.append(WRITER.writeValueAsString(o));
+          results.add(sb.toString());
+        }
+        logWriter.log(results, sequenceId);
+      } catch (Exception e) {
+        exception = true;
+        LOG.error("Exception parse Object", e);
+      }
+      return null;
+    }
+  }
+
+  private static class ByteArrayKeyValue {
+    private final byte[] key;
+    private final byte[] value;
+
+    ByteArrayKeyValue(byte[] key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public byte[] getKey() {
+      return key;
+    }
+
+    public byte[] getValue() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ByteArrayKeyValue that = (ByteArrayKeyValue) o;
+      return Arrays.equals(key, that.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(key);
+    }
+
+    @Override
+    public String toString() {
+      return "ByteArrayKeyValue{" +
+          "key=" + Arrays.toString(key) +
+          ", value=" + Arrays.toString(value) +
+          '}';
+    }
+  }
+
+  private static class LogWriter {
+    private final Map<Long, ArrayList<String>> logs;
+    private final PrintWriter printWriter;
+    private final Thread writerThread;
+    private volatile boolean stop = false;
+    private long expectedSequenceId = FIRST_SEQUENCE_ID;
+    private final Object lock = new Object();
+    private final AtomicLong inflightLogCount = new AtomicLong();
+
+    LogWriter(PrintWriter printWriter) {
+      this.logs = new HashMap<>();
+      this.printWriter = printWriter;
+      this.writerThread = new Thread(new WriterTask());
+    }
+
+    void start() {
+      writerThread.start();
+    }
+
+    public void log(ArrayList<String> msg, long sequenceId) {
+      synchronized (lock) {
+        if (!stop) {
+          logs.put(sequenceId, msg);
+          inflightLogCount.incrementAndGet();
+          lock.notify();
+        }
+      }
+    }
+
+    private final class WriterTask implements Runnable {
+      public void run() {
+        try {
+          while (!stop) {
+            synchronized (lock) {
+              // The sequenceId is incrementally generated as the RocksDB
+              // iterator. Thus, based on the sequenceId, we can strictly 
ensure
+              // that the output order here is consistent with the order of the
+              // RocksDB iterator.
+              // Note that the order here not only requires the sequenceId to 
be
+              // incremental, but also demands that the sequenceId of the
+              // next output is the current sequenceId + 1.
+              ArrayList<String> results = logs.get(expectedSequenceId);
+              if (results != null) {
+                for (String result : results) {
+                  printWriter.println(result);
+                }
+                inflightLogCount.decrementAndGet();
+                logs.remove(expectedSequenceId);
+                // sequenceId of the next output must be the current
+                // sequenceId + 1
+                expectedSequenceId++;
+              } else {
+                lock.wait(1000);
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.error("Exception while output", e);
+        } finally {
+          stop = true;
+          drainRemainingMessages();

Review Comment:
   Yes, This is a missing sync, I have add a lock for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to