Z1Wu opened a new issue, #6469: URL: https://github.com/apache/kyuubi/issues/6469
### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) ### Search before asking - [X] I have searched in the [issues](https://github.com/apache/kyuubi/issues?q=is%3Aissue) and found no similar issues. ### Describe the bug # Problem When fetching big result using feature introduced by https://github.com/apache/kyuubi/issues/5377 ,driver will be oom if there are too many `RecordReaderIterator[OrcStruct]` being instantiated in memory. In current implementation, the number of `RecordReaderIterator[OrcStruct]` will be determined by the number orc files written by insert command which is hard to control when run with AQE or other configuration. https://github.com/apache/kyuubi/blob/95ed74821c3a2d3a3f402033de7b463966a4bc28/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala#L77-L79 Simple script to reproduce the problem: ``` sh # connect to kyuubi server and enable fetch file using FetchOrcStatement beeline -u 'jdbc:hive2://<kyuubi-server>:<port>/?spark.executor.instances=40;kyuubi.operation.result.saveToFile.enabled=true;kyuubi.operation.result.saveToFile.minSize=1;spark.submit.deployMode=client;spark.driver.memory=5g;spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;' -e 'select * from tpcds.sf3000.catalog_returns;' ``` Error Stack : ``` Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: java.lang.OutOfMemoryError: Java heap space at org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423) at org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491) at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103) at org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152) at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149) at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187) at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222) at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254) at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59) at org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:431) .... ``` Heap dump when oom occurs:  We can find when initialize a `RecordReaderIterator`, `OrcMapreduceRecordReader` inside the `RecordReaderIterator` will pre-fetch some rows which occupy substantial memory. # Possible Solution Instead of initializing all `RecordReaderIterator` when create `OrcFileIterator`,we can lazily initialize the RecordReaderIterator to make sure that there is only one RecordReaderIterator which reads file current fetching by client in driver memory. ### Affects Version(s) master ### Kyuubi Server Log Output _No response_ ### Kyuubi Engine Log Output ```logtalk Error: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: java.lang.OutOfMemoryError: Java heap space at org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423) at org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491) at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103) at org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152) at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149) at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187) at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222) at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254) at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59) at org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:431) at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38) at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.<init>(FetchOrcStatement.scala:79) at org.apache.kyuubi.engine.spark.operation.FetchOrcStatement.getIterator(FetchOrcStatement.scala:63) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.collectAsIterator(ExecuteStatement.scala:199) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:99) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$Lambda$3260/474749226.apply$mcV$sp(Unknown Source) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:166) at org.apache.kyuubi.engine.spark.operation.SparkOperation$$Lambda$3261/410226299.apply(Unknown Source) at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69) at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:202) at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:432) at org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:52) at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:190) at org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:183) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:104) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:115) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423) at org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491) at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103) at org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152) at org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149) at org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187) at org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222) at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254) at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65) at org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59) at org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:431) at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38) at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.kyuubi.engine.spark.operation.OrcFileIterator.<init>(FetchOrcStatement.scala:79) at org.apache.kyuubi.engine.spark.operation.FetchOrcStatement.getIterator(FetchOrcStatement.scala:63) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.collectAsIterator(ExecuteStatement.scala:199) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:99) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$Lambda$3260/474749226.apply$mcV$sp(Unknown Source) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:166) at org.apache.kyuubi.engine.spark.operation.SparkOperation$$Lambda$3261/410226299.apply(Unknown Source) at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69) at org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:135) at org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:173) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) (state=,code=0) ``` ### Kyuubi Server Configurations _No response_ ### Kyuubi Engine Configurations _No response_ ### Additional context _No response_ ### Are you willing to submit PR? - [X] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix. - [ ] No. I cannot submit a PR at this time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
