InvisibleProgrammer commented on code in PR #5174:
URL: https://github.com/apache/hive/pull/5174#discussion_r1547456451


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java:
##########
@@ -154,6 +165,90 @@ private void prepare(InputInitializerContext 
initializerContext) throws IOExcept
     LOG.info("SplitLocationProvider: " + splitLocationProvider);
   }
 
+  /**
+   * SplitSerializer is a helper class for taking care of serializing splits 
to the tez scratch dir
+   * when a size criteria defined by 
"hive.tez.split.fs.serialization.threshold" is met.
+   * It utilizes an ExecutorService for parallel writes to prevent a single 
split write operation
+   * becoming the bottleneck (as write() is called from a loop currently).
+   */
+  class SplitSerializer {
+    // fields needed for filepath
+    private String queryId;
+    private String inputName;
+    private int vertexId;
+    private Path appStagingPath;
+    // metrics
+    private AtomicInteger timeSpentWithSplitWriteMs = new AtomicInteger(0);
+    private AtomicInteger splitsWritten = new AtomicInteger(0);
+    // lazy initialized filesystem and executor
+    private FileSystem fs;
+    private ExecutorService executor;
+
+    /**
+     * Lazy init filesystem and executor service: don't initialize if there is 
no split serialized at all.
+     * No need to synchronize, this is called from a loop.
+     */
+    private void lazyInit() throws IOException {
+      if (fs != null) {
+        return;
+      }
+      queryId = jobConf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+      inputName = getContext().getInputName();
+      vertexId = getContext().getVertexId();
+      appStagingPath = TezCommonUtils.getTezSystemStagingPath(conf, 
getContext().getApplicationId().toString());
+
+      fs = appStagingPath.getFileSystem(jobConf);
+      executor = Executors.newFixedThreadPool(8,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("HiveSplitGenerator.SplitSerializer Thread - " + 
"#%d").build());
+    }
+
+    private InputDataInformationEvent write(int count, MRSplitProto mrSplit) 
throws IOException {
+      lazyInit();
+
+      InputDataInformationEvent diEvent;
+      Path filePath = getSerializedFilePath(count);
+
+      // parallel writes for better performance (this is called from a loop)
+      executor.submit(() -> {
+        try {
+          long now = Time.monotonicNow();
+          try (FSDataOutputStream out = fs.create(filePath, false)) {
+            mrSplit.writeTo(out);
+            out.close();
+          }
+          splitsWritten.getAndIncrement();
+          long elapsed = Time.monotonicNow() - now;
+          timeSpentWithSplitWriteMs.getAndAdd((int) elapsed);
+          LOG.debug("Split #{} event to output path: {} written in {} ms", 
count, filePath, elapsed);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+      // return the event now, we'll wait for the actual file write in close() 
if needed
+      return InputDataInformationEvent.createWithSerializedPath(count, 
filePath.toString());
+    }
+
+    private Path getSerializedFilePath(int index) {
+      // e.g. staging_dir/events/queryid/inputtable_InputDataInformationEvent_0
+      return new 
Path(String.format("%s/events/%s/%d_%s_InputDataInformationEvent_%d", 
appStagingPath, queryId,
+          vertexId, inputName, index));
+    }
+
+    private void close() throws IOException {
+      if (fs != null) {
+        executor.shutdown();
+        try {
+          executor.awaitTermination(5, TimeUnit.MINUTES);

Review Comment:
   Is that a common practise that we have a hard-coded timeout? Do we have any 
config about that? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to