This is an automated email from the ASF dual-hosted git repository. daijy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b2433c3ef5f HIVE-27600: Reduce filesystem calls in OrcFileMergeOperator (#4579) b2433c3ef5f is described below commit b2433c3ef5faf1a69f5e29643a4a8d12c5528934 Author: yigress <104102129+yigr...@users.noreply.github.com> AuthorDate: Sat Aug 19 22:16:28 2023 -0700 HIVE-27600: Reduce filesystem calls in OrcFileMergeOperator (#4579) --- .../hadoop/hive/ql/exec/OrcFileMergeOperator.java | 34 ++++++++-------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index 5fec9c8a1f3..cb538c4a708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +61,8 @@ public class OrcFileMergeOperator extends private Map<Integer, Writer> outWriters = new HashMap<>(); private Path prevPath; - private Reader reader; private FSDataInputStream fdis; + private ObjectInspector obi; /** Kryo ctor. */ protected OrcFileMergeOperator() { @@ -110,10 +111,16 @@ public class OrcFileMergeOperator extends if (prevPath == null) { prevPath = k.getInputPath(); } - if (reader == null) { - reader = OrcFile.createReader(fs, k.getInputPath()); - LOG.info("ORC merge file input path: " + k.getInputPath()); + if (obi == null) { + Reader reader = OrcFile.createReader(fs, prevPath); + obi = reader.getObjectInspector(); + try { + reader.close(); + } catch (IOException e) { + throw new HiveException(String.format("Unable to close reader for %s", filePath), e); + } } + LOG.info("ORC merge file input path: " + k.getInputPath()); // store the orc configuration from the first file. All other files should // match this configuration before merging else will not be merged @@ -133,7 +140,7 @@ public class OrcFileMergeOperator extends .compress(compression) .version(fileVersion) .rowIndexStride(rowIndexStride) - .inspector(reader.getObjectInspector()); + .inspector(obi); // compression buffer size should only be set if compression is enabled if (compression != CompressionKind.NONE) { // enforce is required to retain the buffer sizes of old files instead of orc writer @@ -154,14 +161,6 @@ public class OrcFileMergeOperator extends return; } - // next file in the path - if (!k.getInputPath().equals(prevPath)) { - if (reader != null) { - reader.close(); - } - reader = OrcFile.createReader(fs, k.getInputPath()); - } - // initialize buffer to read the entire stripe byte[] buffer = new byte[(int) v.getStripeInformation().getLength()]; fdis = fs.open(k.getInputPath()); @@ -193,15 +192,6 @@ public class OrcFileMergeOperator extends if (exception) { closeOp(true); } - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - throw new HiveException(String.format("Unable to close reader for %s", filePath), e); - } finally { - reader = null; - } - } if (fdis != null) { try { fdis.close();