sumitagrawl commented on code in PR #5358:
URL: https://github.com/apache/ozone/pull/5358#discussion_r1336727694
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator
iterator,
boolean schemaV3)
throws IOException {
- if (fileName == null) {
- // Print to stdout
- return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
- }
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("DBScanner-%d")
+ .build();
+ ExecutorService threadPool = new ThreadPoolExecutor(
Review Comment:
threadPool construction can be moved inside the method where its used and
destroyed.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator
iterator,
boolean schemaV3)
throws IOException {
- if (fileName == null) {
- // Print to stdout
- return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
- }
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("DBScanner-%d")
+ .build();
+ ExecutorService threadPool = new ThreadPoolExecutor(
+ THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), factory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
- // Write to file output
- try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
- return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+ PrintWriter printWriter = null;
+ try {
+ if (fileName != null) {
+ printWriter = new PrintWriter(
+ new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+ } else {
+ printWriter = out();
+ }
+ return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+ schemaV3);
+ } finally {
+ if (printWriter != null) {
+ printWriter.close();
+ }
}
}
private boolean displayTable(ManagedRocksIterator iterator,
DBColumnFamilyDefinition dbColumnFamilyDef,
- PrintWriter out,
- boolean schemaV3)
- throws IOException {
+ PrintWriter printWriter,
+ ExecutorService threadPool, boolean schemaV3) {
+ LogWriter logWriter = new LogWriter(printWriter);
+ try {
+ // Start JSON object (map) or array
+ printWriter.print(withKey ? "{ " : "[ ");
+ logWriter.start();
+ processRecords(iterator, dbColumnFamilyDef, logWriter,
+ threadPool, schemaV3);
+ } catch (InterruptedException e) {
+ exception = true;
+ Thread.currentThread().interrupt();
+ } finally {
+ threadPool.shutdown();
+ try {
+ if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ threadPool.shutdownNow();
+ err().println("Thread pool did not terminate; forced shutdown.");
+ }
+ } catch (InterruptedException ie) {
+ threadPool.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ logWriter.stop();
+ logWriter.join();
+ // End JSON object (map) or array
+ printWriter.println(withKey ? " }" : " ]");
+ }
+
+ return !exception;
+ }
+ private void processRecords(ManagedRocksIterator iterator,
+ DBColumnFamilyDefinition dbColumnFamilyDef,
+ LogWriter logWriter,
+ ExecutorService threadPool,
+ boolean schemaV3) throws InterruptedException
{
if (startKey != null) {
iterator.get().seek(getValueObject(dbColumnFamilyDef));
}
-
- if (withKey) {
- // Start JSON object (map)
- out.print("{ ");
- } else {
- // Start JSON array
- out.print("[ ");
- }
-
+ ArrayList<ByteArrayKeyValue> batch = new ArrayList<>(BATCH_SIZE);
+ // Used to ensure that the output of a multi-threaded parsed Json is in
+ // the same order as the RocksDB iterator.
+ long sequenceId = FIRST_SEQUENCE_ID;
// Count number of keys printed so far
long count = 0;
- while (withinLimit(count) && iterator.get().isValid()) {
- StringBuilder sb = new StringBuilder();
- if (withKey) {
- Object key = dbColumnFamilyDef.getKeyCodec()
- .fromPersistedFormat(iterator.get().key());
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- if (schemaV3) {
- int index =
- DatanodeSchemaThreeDBDefinition.getContainerKeyPrefixLength();
- String keyStr = key.toString();
- if (index > keyStr.length()) {
- err().println("Error: Invalid SchemaV3 table key length. "
- + "Is this a V2 table? Try again with --dn-schema=V2");
- return false;
- }
- String cid = keyStr.substring(0, index);
- String blockId = keyStr.substring(index);
- sb.append(gson.toJson(LongCodec.get().fromPersistedFormat(
- FixedLengthStringCodec.string2Bytes(cid)) +
- keySeparatorSchemaV3 +
- blockId));
- } else {
- sb.append(gson.toJson(key));
- }
- sb.append(": ");
- }
-
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- Object o = dbColumnFamilyDef.getValueCodec()
- .fromPersistedFormat(iterator.get().value());
- sb.append(gson.toJson(o));
-
+ List<Future<Void>> futures = new ArrayList<>();
+ while (withinLimit(count) && iterator.get().isValid() && !exception) {
+ batch.add(new ByteArrayKeyValue(
+ iterator.get().key(), iterator.get().value()));
iterator.get().next();
- ++count;
- if (withinLimit(count) && iterator.get().isValid()) {
- // If this is not the last entry, append comma
- sb.append(", ");
+ count++;
+ if (batch.size() >= BATCH_SIZE) {
+ //
+ while (logWriter.getInflightLogCount() > THREAD_COUNT * 10
Review Comment:
This check may not prevent memory for cases,
- Task added to threadpool will be in queue or in execution, and that is not
considered
It should considered both in Writer and ThreadPool Task
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator
iterator,
boolean schemaV3)
throws IOException {
- if (fileName == null) {
- // Print to stdout
- return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
- }
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("DBScanner-%d")
+ .build();
+ ExecutorService threadPool = new ThreadPoolExecutor(
+ THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), factory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
- // Write to file output
- try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
- return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+ PrintWriter printWriter = null;
+ try {
+ if (fileName != null) {
+ printWriter = new PrintWriter(
+ new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+ } else {
+ printWriter = out();
+ }
+ return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+ schemaV3);
+ } finally {
+ if (printWriter != null) {
+ printWriter.close();
+ }
}
}
private boolean displayTable(ManagedRocksIterator iterator,
DBColumnFamilyDefinition dbColumnFamilyDef,
- PrintWriter out,
- boolean schemaV3)
- throws IOException {
+ PrintWriter printWriter,
+ ExecutorService threadPool, boolean schemaV3) {
+ LogWriter logWriter = new LogWriter(printWriter);
+ try {
+ // Start JSON object (map) or array
+ printWriter.print(withKey ? "{ " : "[ ");
+ logWriter.start();
+ processRecords(iterator, dbColumnFamilyDef, logWriter,
+ threadPool, schemaV3);
+ } catch (InterruptedException e) {
+ exception = true;
+ Thread.currentThread().interrupt();
+ } finally {
+ threadPool.shutdown();
+ try {
+ if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+ threadPool.shutdownNow();
+ err().println("Thread pool did not terminate; forced shutdown.");
+ }
+ } catch (InterruptedException ie) {
+ threadPool.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ logWriter.stop();
Review Comment:
This should be done before "Thread.currentThread().interrupt() as this may
throw further exception. Or Thread.currentThread().interrupt() must be last
statement in finally
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -176,88 +202,114 @@ private boolean displayTable(ManagedRocksIterator
iterator,
boolean schemaV3)
throws IOException {
- if (fileName == null) {
- // Print to stdout
- return displayTable(iterator, dbColumnFamilyDef, out(), schemaV3);
- }
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("DBScanner-%d")
+ .build();
+ ExecutorService threadPool = new ThreadPoolExecutor(
+ THREAD_COUNT, THREAD_COUNT, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), factory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
- // Write to file output
- try (PrintWriter out = new PrintWriter(fileName, UTF_8.name())) {
- return displayTable(iterator, dbColumnFamilyDef, out, schemaV3);
+ PrintWriter printWriter = null;
+ try {
+ if (fileName != null) {
+ printWriter = new PrintWriter(
+ new BufferedWriter(new PrintWriter(fileName, UTF_8.name())));
+ } else {
+ printWriter = out();
+ }
+ return displayTable(iterator, dbColumnFamilyDef, printWriter, threadPool,
+ schemaV3);
+ } finally {
+ if (printWriter != null) {
+ printWriter.close();
+ }
}
}
private boolean displayTable(ManagedRocksIterator iterator,
DBColumnFamilyDefinition dbColumnFamilyDef,
- PrintWriter out,
- boolean schemaV3)
- throws IOException {
+ PrintWriter printWriter,
+ ExecutorService threadPool, boolean schemaV3) {
+ LogWriter logWriter = new LogWriter(printWriter);
+ try {
+ // Start JSON object (map) or array
+ printWriter.print(withKey ? "{ " : "[ ");
+ logWriter.start();
+ processRecords(iterator, dbColumnFamilyDef, logWriter,
+ threadPool, schemaV3);
+ } catch (InterruptedException e) {
+ exception = true;
+ Thread.currentThread().interrupt();
+ } finally {
+ threadPool.shutdown();
+ try {
+ if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
Review Comment:
IMO, we can have immediate shutdown and logwriter shutdown. If need wait for
complete tasks, can do future get with timout.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java:
##########
@@ -364,4 +416,252 @@ private String removeTrailingSlashIfNeeded(String dbPath)
{
public Class<?> getParentType() {
return RDBParser.class;
}
+
+ /**
+ * Utility for centralized JSON serialization using Jackson.
+ */
+ @VisibleForTesting
+ public static class JsonSerializationHelper {
+ /**
+ * In order to maintain consistency with the original Gson output to do
+ * this setup makes the output from Jackson closely match the
+ * output of Gson.
+ */
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+ // Ignore standard getters.
+ .setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
+ // Ignore boolean "is" getters.
+ .setVisibility(PropertyAccessor.IS_GETTER,
+ JsonAutoDetect.Visibility.NONE)
+ // Exclude null values.
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ public static final ObjectWriter WRITER;
+
+ static {
+ if (compact) {
+ WRITER = OBJECT_MAPPER.writer();
+ } else {
+ WRITER = OBJECT_MAPPER.writerWithDefaultPrettyPrinter();
+ }
+ }
+
+ public static ObjectWriter getWriter() {
+ return WRITER;
+ }
+ }
+
+
+ private static class Task implements Callable<Void> {
+
+ private final DBColumnFamilyDefinition dbColumnFamilyDefinition;
+ private final ArrayList<ByteArrayKeyValue> batch;
+ private final LogWriter logWriter;
+ private static final ObjectWriter WRITER =
+ JsonSerializationHelper.getWriter();
+ private final long sequenceId;
+ private final boolean withKey;
+ private final boolean schemaV3;
+
+ Task(DBColumnFamilyDefinition dbColumnFamilyDefinition,
+ ArrayList<ByteArrayKeyValue> batch, LogWriter logWriter,
+ long sequenceId, boolean withKey, boolean schemaV3) {
+ this.dbColumnFamilyDefinition = dbColumnFamilyDefinition;
+ this.batch = batch;
+ this.logWriter = logWriter;
+ this.sequenceId = sequenceId;
+ this.withKey = withKey;
+ this.schemaV3 = schemaV3;
+ }
+
+ @Override
+ public Void call() {
+ try {
+ ArrayList<String> results = new ArrayList<>(batch.size());
+ for (ByteArrayKeyValue byteArrayKeyValue : batch) {
+ StringBuilder sb = new StringBuilder();
+ if (sequenceId == FIRST_SEQUENCE_ID && !results.isEmpty()) {
Review Comment:
comment and if condition do not match, if First sequence, no need append
comma but check seems reverse.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]