Vikram Ahuja created HIVE-29632:
-----------------------------------

             Summary: hive.fetch.task.caching=true (default) causes unbounded 
heap allocation on non-ACID tables, crashing HiveServer2 with OutOfMemoryError
                 Key: HIVE-29632
                 URL: https://issues.apache.org/jira/browse/HIVE-29632
             Project: Hive
          Issue Type: Improvement
    Affects Versions: 4.0.1, 4.0.0
            Reporter: Vikram Ahuja
            Assignee: Vikram Ahuja


  \{{hive.fetch.task.caching}} defaults to \{{true}} in Hive 4. When a query 
qualifies for fetch task optimization (\{{hive.fetch.task.conversion=more}}),
  \{{FetchTask.execute()}} pre-loads the entire result set into a \{{List}} in 
JVM heap before serving any rows to the client. On a non-ACID table with no 
LIMIT clause, this
   attempts to load all the rows(INT_MAX), causing \{{OutOfMemoryError}} on any 
table large enough to fill the heap.

  The threshold guard (\{{hive.fetch.task.conversion.threshold=200MB}}) does 
not protect against this because it compares compressed on-disk bytes against 
the threshold,
  not JVM heap cost after deserialization. An ORC/Parquet file of 150 MB on 
disk can expand to 30+ GB of Java \{{String}} objects in heap.

  The feature was introduced in 
[HIVE-25976|https://issues.apache.org/jira/browse/HIVE-25976] specifically to 
prevent Hive Cleaner race conditions on transactional (ACID)
  tables, where files can be deleted mid-fetch by the Cleaner. It has no 
benefit for non-ACID tables as the Cleaner does not operate on them, yet it is 
applied
  unconditionally to all table types.

 

  h3. What hive.fetch.task.caching=true does

  When enabled, \{{SimpleFetchOptimizer}} sets \{{cachingEnabled=true}} on the 
\{{FetchTask}}. After Tez/MR execution completes, \{{Driver}} calls 
\{{fetchTask.execute()}},
  which calls \{{executeInner(fetchedData)}} with:

  \{code:java}
  // FetchTask.java
  public int execute() {
    if (cachingEnabled) {
      executeInner(fetchedData);  // loads ALL rows before serving any
    }
    return 0;
  }

  private boolean executeInner(List target) {
    int rowsRet;
    if (cachingEnabled) {
      rowsRet = work.getLimit() >= 0 ? work.getLimit() : Integer.MAX_VALUE;
    }
    // ...
    while (sink.getNumRows() < rowsRet) {
      fetch.pushRow();  // reads every row from HDFS into fetchedData
    }
  }
  \{code}

  Each row is serialized to a tab-separated \{{java.lang.String}} by 
\{{DefaultFetchFormatter}} before being stored in \{{fetchedData: ArrayList}}. 
All ORC/Parquet
  optimizations (dictionary encoding, RLE, columnar layout, block compression) 
are discarded. Repeated values (e.g. a country code column with 10 distinct 
values across
  40M rows) become 40M separate \{{String}} objects with no sharing.

  h3. Memory amplification

  ||Representation||Size||
  |ORC/Parquet compressed on disk|~150 MB|
  |Decompressed raw bytes|~3 GB|
  |Java String objects in heap (tab-separated, no compression, no dictionary 
sharing)|~34 GB|

  The ~200x amplification causes \{{OutOfMemoryError}} on any heap smaller than 
the fully-deserialized result set.

 

  h3. Broken threshold guard

  \{{SimpleFetchOptimizer.checkThresholdWithMetastoreStats()}} uses 
\{{StatsSetupConst.TOTAL_SIZE}} from HMS stats, which is the compressed file 
size on disk:

  \{code:java}
  // SimpleFetchOptimizer.java - FetchData.checkThresholdWithMetastoreStats()
  long dataSize = StatsUtils.getTotalSize(table);  // compressed bytes!
  status = (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL;
  \{code}

  A 150 MB ORC file passes the default 200 MB threshold check, caching is 
enabled, and 34 GB of String objects flood the heap. The \{{HiveConf}} javadoc 
for
  \{{HIVE_FETCH_TASK_CACHING}} acknowledges this: "the 
hive.fetch.task.conversion.threshold must be adjusted accordingly. That is 
200MB by default which must be lowered in
  case of enabled caching" — but this is never enforced in code.

  h3. Retention amplifies impact

  With \{{hive.server2.idle.operation.timeout=2h}} (default), unclosed JDBC 
operations retain \{{fetchedData}} in heap for up to 2 hours. Multiple 
concurrent large queries
  cause additive pressure. There is no soft/weak reference, no memory-pressure 
eviction, and no size cap on \{{fetchedData}}.

  h3. Why non-ACID tables have no need for caching

  The feature was designed for transactional tables only. The Hive Cleaner 
compacts delta files and deletes old base/delta files  for a slow JDBC client 
fetching 10K rows
   at a time over minutes, the Cleaner can delete files mid-fetch, causing 
\{{FileNotFoundException}}. Pre-loading into RAM solves this.

  For non-ACID tables (managed, external, ORC, Parquet, Iceberg), the Cleaner 
never runs. There is no race condition to prevent. Caching on these tables 
provides zero
  benefit while introducing unbounded heap allocation.

  ---h2. Steps to Reproduce

  h3. Setup

  \{code:sql}
  -- Low-cardinality Parquet table (high compression ratio is essential to stay 
under threshold)
  CREATE TABLE transactions (
    txn_id BIGINT, acct_id STRING, mrch_id STRING, txn_amt DOUBLE,
    txn_dt STRING, ctry_cd STRING, prod_cd STRING,
    status_cd STRING, channel_cd STRING, proc_cd STRING
  )
  STORED AS PARQUET
  TBLPROPERTIES ("parquet.compression"="SNAPPY");
  \{code}

  Generate 4M rows with low-cardinality values (20 account IDs, 10 merchant 
IDs, 4 status codes, etc.) so Parquet+Snappy compresses to ~30 MB for 4M rows. 
Insert 4 times
  to reach 16M rows (~120 MB on disk total).

  \{code:sql}
  -- Verify file size stays under 200MB threshold (caching will be enabled)
  -- hdfs dfs -du -s -h /warehouse/.../transactions/
  -- Expected: ~120 MB

  -- Trigger OOM
  SELECT * FROM transactions;
  \{code}

  h3. HiveServer2 configuration

  \{noformat}
  -Xmx6g
  hive.fetch.task.conversion=more
  hive.fetch.task.caching=true          (default in Hive 4)
  hive.fetch.task.conversion.threshold=209715200  (200MB default)
  \{noformat}

  h3. Observed GC pattern before crash

  \{noformat}
  [GC pause (G1 Evacuation Pause)]  heap: 2048M->2040M(6144M)
  [GC pause (G1 Evacuation Pause)]  heap: 4096M->4090M(6144M)
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  [Full GC (Allocation Failure)]    heap: 6140M->6140M(6144M)  <- zero freed
  java.lang.OutOfMemoryError: Java heap space
  \{noformat}

  ---h2. Expected Behavior

  - \{{SELECT * FROM non_acid_table}} completes successfully, streaming rows 
batch-by-batch to the client
  - Heap usage remains bounded during fetch; GC can reclaim memory between 
queries
  - \{{hive.fetch.task.caching}} only affects transactional (ACID) tables, 
which are the only table type for which the Cleaner race condition exists

  h2. Actual Behavior

  - \{{FetchTask}} pre-loads all rows into \{{fetchedData: ArrayList}} before 
returning any rows
  - Heap fills to 100%; G1GC enters a death spiral of Full GC with zero bytes 
freed
  - HiveServer2 crashes with \{{java.lang.OutOfMemoryError: Java heap space}}

  ---h2. Fix

  The fix is a one-line guard in \{{SimpleFetchOptimizer.optimize()}} to 
disable caching for non-transactional tables:

  \{code:java}
  // SimpleFetchOptimizer.java
  boolean cachingEnabled = HiveConf.getBoolVar(pctx.getConf(),
      HiveConf.ConfVars.HIVE_FETCH_TASK_CACHING);
  if (cachingEnabled && !AcidUtils.isTransactionalTable(fetch.table)) {
    LOG.debug("Fetch task caching is enabled but table {} is not transactional. 
" +
        "Caching is only supported for ACID tables to prevent Cleaner race 
conditions. Disabling.",
        fetch.table.getCompleteName());
    cachingEnabled = false;
  }
  fetchTask.setCachingEnabled(cachingEnabled);
  \{code}

  This preserves the original HIVE-25976 intent (caching for ACID tables) while 
eliminating the OOM risk for all other table types.

  h3. Additional issues not addressed by this fix (follow-up work)

  Threshold uses compressed disk size — \{{checkThresholdWithMetastoreStats()}} 
should use \{{RAW_DATA_SIZE}} or \{{numRows}} × estimated row size instead of 
\{{TOTAL_SIZE}}

  No LIMIT guard — when caching is enabled and \{{work.getLimit() < 0}}, 
\{{rowsRet = Integer.MAX_VALUE}} should fall back to streaming

  Default should be false — \{{HIVE_FETCH_TASK_CACHING}} defaults to \{{true}}; 
it should default to \{{false}} and require explicit opt-in

  ---h2. Environment

  - Hive 4.0.x
  - Java 17
  - G1GC
  - Feature introduced in HIVE-25976

  h2. Workaround

  Set in \{{hive-site.xml}} and restart HiveServer2:

  \{code:xml}

    hive.fetch.task.caching
    false

  \{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to