danny0405 commented on a change in pull request #2496:
URL: https://github.com/apache/hudi/pull/2496#discussion_r567627556



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
##########
@@ -54,6 +54,10 @@ public TaskContextSupplier getTaskContextSupplier() {
     return taskContextSupplier;
   }
 
+  public void setConfig(String name, String value) {
+    getHadoopConf().get().set(name, value);
+  }

Review comment:
       `setConfig` => `setHadoopConf` ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -118,12 +156,31 @@ private static Registry getMetricRegistryForPath(Path p) {
     return executeFuncWithTimeMetrics(metricName, p, func);
   }
 
+  /**
+   * Executes the given function and updates the metric with time taken to 
execute the function and number of bytes.
+   *
+   * The number of bytes are returned from the execution of the provided 
function.
+   */
+  protected static int executeFuncWithTimeAndByteMetrics(String metricName, 
Path p, CheckedIntFunction func) throws IOException {
+    int ret = executeFuncWithTimeMetrics(metricName, p, func);
+    if (ret > 0) {
+      Registry registry = getMetricRegistryForPath(p);
+      if (registry != null) {

Review comment:
       The code invoke `getMetricRegistryForPath(p)` 2 times.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -419,13 +417,8 @@ public static boolean isLogFile(Path logPath) {
    * Get the names of all the base and log files in the given partition path.
    */
   public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path 
partitionPath) throws IOException {
-    final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
-        
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
-    final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
-
     return Arrays.stream(fs.listStatus(partitionPath, path -> {
-      String extension = FSUtils.getFileExtension(path.getName());
-      return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
+      return HoodieFileFormat.isBaseFile(path) || isLogFile(path);
     })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
   }

Review comment:
       I guess `FileStatus::isFile` is always true if 
`HoodieFileFormat.isBaseFile(path) || isLogFile(path)` is true ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information

Review comment:
       Why still name the class `TimedFSInputStream` based on the fact that the 
written bytes size is also recorded ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -192,27 +261,74 @@ public FSDataOutputStream create(Path f, FsPermission 
permission, boolean overwr
     return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
       final Path translatedPath = convertToDefaultPath(f);
       return wrapOutputStream(f,
-          fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress));
+          fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress), bufferSize);
     });
   }
 
-  private FSDataOutputStream wrapOutputStream(final Path path, 
FSDataOutputStream fsDataOutputStream)
+
+  /**
+   * The stream hierarchy after wrapping will be as follows.
+   *
+   *  FSDataOuputStream (returned)
+   *      BufferedOutputStream (required for output buffering)
+   *          TimedSizeAwareOutputStream  (required for tracking metrics, 
timings and the number of bytes written)
+   *               fs.open()   (Original stream returned from underlying 
FileSystem)
+   */
+  private FSDataOutputStream wrapOutputStream(final Path path, 
FSDataOutputStream fsDataOutputStream, int bufferSize)
       throws IOException {
-    if (fsDataOutputStream instanceof SizeAwareFSDataOutputStream) {
-      return fsDataOutputStream;
+    TimedSizeAwareOutputStream tos = new TimedSizeAwareOutputStream(path, 
fsDataOutputStream, consistencyGuard,
+        () -> openStreams.remove(path.getName()));
+
+    if (ioBufferingEnabled) {
+      int minBufferSize = (HoodieFileFormat.isBaseFile(path) || 
FSUtils.isLogFile(path)) ? minDataFileIOBufferSizeBytes
+          : minFileIOBufferSizeBytes;

Review comment:
       Can we just add a method named `FsUtil.isDataFile(Path)` to enclose
   ```java
   HoodieFileFormat.isBaseFile(path) || FSUtils.isLogFile(path)
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -79,15 +79,12 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
       this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
           new BufferedFSInputStream((FSInputStream) ((
               (FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
-    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
-      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
-      // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
       this.inputStream = fsDataInputStream;
     }
 
+    LOG.error("Opened inputstream of type " + 
this.inputStream.getClass().getName() + "  wrapping over "

Review comment:
       +1, why ???




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to