This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b805961e05 feat(flink): Off-heap lookup join cache backed by RocksDB 
(#18231)
f7b805961e05 is described below

commit f7b805961e050dc5cd325cc85cc7d367fff4d045
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Feb 23 10:41:18 2026 +0700

    feat(flink): Off-heap lookup join cache backed by RocksDB (#18231)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    |  19 +++
 .../apache/hudi/table/lookup/HeapLookupCache.java  |  60 +++++++
 .../hudi/table/lookup/HoodieLookupFunction.java    |  56 +++++--
 .../org/apache/hudi/table/lookup/LookupCache.java  |  60 +++++++
 .../hudi/table/lookup/RocksDBLookupCache.java      | 181 +++++++++++++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  28 ++--
 6 files changed, 382 insertions(+), 22 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 62b496378958..a7b33421e72b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -1306,6 +1306,25 @@ public class FlinkOptions extends HoodieConfig {
           .defaultValue(16)
           .withDescription("The thread number for lookup async.");
 
+  public static final ConfigOption<String> LOOKUP_JOIN_CACHE_TYPE =
+      key("lookup.join.cache.type")
+          .stringType()
+          .defaultValue("heap")
+          .withDescription("The storage backend for the lookup join cache. "
+              + "Possible values: 'heap' (default) stores all dimension-table 
rows in JVM heap memory "
+              + "(may cause OutOfMemoryError for large tables); "
+              + "'rocksdb' stores rows off-heap in an embedded RocksDB 
instance on local disk, "
+              + "which avoids OOM at the cost of additional serialization 
overhead.");
+
+  public static final ConfigOption<String> LOOKUP_JOIN_ROCKSDB_PATH =
+      key("lookup.join.rocksdb.path")
+          .stringType()
+          .defaultValue(System.getProperty("java.io.tmpdir") + 
"/hudi-lookup-rocksdb")
+          .withDescription("Local directory path for storing RocksDB data when 
"
+              + "'lookup.join.cache.type' is set to 'rocksdb'. "
+              + "Each task manager will create a unique subdirectory under 
this path. "
+              + "The directory is cleaned up when the lookup function is 
closed.");
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
new file mode 100644
index 000000000000..455a6432fd81
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Heap-based {@link LookupCache} backed by a {@link HashMap}.
+ *
+ * <p>Suitable for small-to-medium dimension tables. For large tables, use
+ * {@link RocksDBLookupCache} to avoid JVM OutOfMemoryError.
+ */
+public class HeapLookupCache implements LookupCache {
+
+  private final Map<RowData, List<RowData>> store = new HashMap<>();
+
+  @Override
+  public void addRow(RowData key, RowData row) {
+    store.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+  }
+
+  @Override
+  @Nullable
+  public List<RowData> getRows(RowData key) {
+    return store.get(key);
+  }
+
+  @Override
+  public void clear() {
+    store.clear();
+  }
+
+  @Override
+  public void close() {
+    store.clear();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index 6663eefd01a7..443cba0bcf50 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -38,21 +39,25 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Lookup function for Hoodie dimension table.
  *
  * <p>Note: reference Flink FileSystemLookupFunction to avoid additional 
connector jar dependencies.
+ *
+ * <p>The underlying cache can be heap-based ({@code 
lookup.join.cache.type=heap}, default) or
+ * RocksDB-backed ({@code lookup.join.cache.type=rocksdb}). The RocksDB option 
stores all dimension
+ * table rows off-heap on local disk, preventing OutOfMemoryError when the 
dimension table is large.
  */
 @Slf4j
-public class HoodieLookupFunction  extends LookupFunction implements 
Serializable, Closeable {
+public class HoodieLookupFunction extends LookupFunction implements 
Serializable, Closeable {
 
   private static final long serialVersionUID = 1L;
 
@@ -67,9 +72,10 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
   private final Duration reloadInterval;
   private final TypeSerializer<RowData> serializer;
   private final RowType rowType;
+  private final int[] lookupKeys;
 
   // cache for lookup data
-  private transient Map<RowData, List<RowData>> cache;
+  private transient LookupCache cache;
   // timestamp when cache expires
   private transient long nextLoadTime;
 
@@ -86,6 +92,7 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
       Configuration conf) {
     this.partitionReader = partitionReader;
     this.rowType = rowType;
+    this.lookupKeys = lookupKeys;
     this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
     for (int i = 0; i < lookupKeys.length; i++) {
       lookupFieldGetters[i] =
@@ -99,7 +106,7 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
   @Override
   public void open(FunctionContext context) throws Exception {
     functionContext = context;
-    cache = new HashMap<>();
+    cache = createCache();
     nextLoadTime = -1L;
     org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
     metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
@@ -109,13 +116,13 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
   public Collection<RowData> lookup(RowData keyRow) {
     try {
       checkCacheReload();
-      return cache.get(keyRow);
+      return cache.getRows(keyRow);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  private void checkCacheReload() {
+  private void checkCacheReload() throws IOException {
     if (nextLoadTime > System.currentTimeMillis()) {
       return;
     }
@@ -151,10 +158,10 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
           count++;
           RowData rowData = serializer.copy(row);
           RowData key = extractLookupKey(rowData);
-          List<RowData> rows = cache.computeIfAbsent(key, k -> new 
ArrayList<>());
-          rows.add(rowData);
+          cache.addRow(key, rowData);
         }
         partitionReader.close();
+        currentCommit = latestCommitInstant.get();
         nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
         log.info("Loaded {} row(s) into lookup join cache", count);
         return;
@@ -188,7 +195,34 @@ public class HoodieLookupFunction  extends LookupFunction 
implements Serializabl
 
   @Override
   public void close() {
-    // no operation
+    if (cache != null) {
+      try {
+        cache.close();
+      } catch (Exception e) {
+        log.warn("Failed to close lookup cache", e);
+      }
+      cache = null;
+    }
+  }
+
+  private LookupCache createCache() {
+    String cacheType = conf.get(FlinkOptions.LOOKUP_JOIN_CACHE_TYPE);
+    if ("rocksdb".equalsIgnoreCase(cacheType)) {
+      String rocksDbPath = conf.get(FlinkOptions.LOOKUP_JOIN_ROCKSDB_PATH);
+      log.info("Creating RocksDB lookup cache at {}", rocksDbPath);
+      RowType keyRowType = buildKeyRowType();
+      TypeSerializer<RowData> keySerializer = 
InternalSerializers.create(keyRowType);
+      return new RocksDBLookupCache(keySerializer, serializer, rocksDbPath);
+    }
+    log.info("Creating heap lookup cache");
+    return new HeapLookupCache();
+  }
+
+  private RowType buildKeyRowType() {
+    List<RowType.RowField> keyFields = Arrays.stream(lookupKeys)
+        .mapToObj(i -> rowType.getFields().get(i))
+        .collect(Collectors.toList());
+    return new RowType(keyFields);
   }
 
   @VisibleForTesting
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
new file mode 100644
index 000000000000..93e99c913e7c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Cache abstraction for lookup join dimension table data.
+ *
+ * <p>Implementations may store data on-heap (e.g. {@link HeapLookupCache}) or 
off-heap
+ * (e.g. {@link RocksDBLookupCache}) to avoid JVM OutOfMemoryError on large 
tables.
+ */
+public interface LookupCache extends Closeable {
+
+  /**
+   * Adds a single row to the cache under the given lookup key.
+   * Multiple rows may be associated with the same key.
+   *
+   * @param key   the lookup key row (contains only the join key fields)
+   * @param row   the full dimension table row
+   * @throws IOException if the write fails
+   */
+  void addRow(RowData key, RowData row) throws IOException;
+
+  /**
+   * Returns all rows matching the given lookup key, or {@code null} / empty 
list if none exist.
+   *
+   * @param key the lookup key row
+   * @return matching rows, or {@code null} if not found
+   */
+  @Nullable
+  List<RowData> getRows(RowData key) throws IOException;
+
+  /**
+   * Clears all entries from the cache so it can be reloaded.
+   */
+  void clear() throws IOException;
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
new file mode 100644
index 000000000000..4bb41dfcc9a6
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Off-heap {@link LookupCache} backed by RocksDB.
+ *
+ * <p>Data is stored on disk / in the OS page cache, keeping JVM heap usage 
minimal. Each lookup
+ * key maps to one or more full dimension-table rows. The compound RocksDB key 
encodes the lookup
+ * key bytes in hexadecimal plus a monotonic counter so that multiple rows 
sharing the same lookup
+ * key can coexist in the store without overwriting each other. Retrieval is a 
prefix scan on the
+ * hex-encoded lookup key.
+ *
+ * <p>Keys are serialized with Flink's {@link TypeSerializer} for the key 
{@link
+ * org.apache.flink.table.types.logical.RowType}, and values (full rows) are 
serialized with the
+ * row-level {@link TypeSerializer}. The raw bytes are stored as-is in RocksDB 
via a
+ * pass-through {@link CustomSerializer} — no additional Java-serialization 
overhead.
+ */
+@Slf4j
+public class RocksDBLookupCache implements LookupCache {
+
+  private static final String COLUMN_FAMILY = "lookup_cache";
+
+  /** Separator between the hex-encoded key and the row counter in the 
compound RocksDB key. */
+  private static final String KEY_SEPARATOR = "_";
+
+  private final TypeSerializer<RowData> keySerializer;
+  private final TypeSerializer<RowData> rowSerializer;
+  private final String rocksDbBasePath;
+
+  /** Reusable serialization buffer — single-threaded use only. */
+  private final DataOutputSerializer keyOutputBuffer = new 
DataOutputSerializer(64);
+  private final DataOutputSerializer rowOutputBuffer = new 
DataOutputSerializer(256);
+  private final DataInputDeserializer rowInputBuffer = new 
DataInputDeserializer();
+
+  private RocksDBDAO rocksDBDAO;
+  private long rowCounter;
+
+  public RocksDBLookupCache(
+      TypeSerializer<RowData> keySerializer,
+      TypeSerializer<RowData> rowSerializer,
+      String rocksDbBasePath) {
+    this.keySerializer = keySerializer;
+    this.rowSerializer = rowSerializer;
+    this.rocksDbBasePath = rocksDbBasePath;
+    this.rocksDBDAO = createDAO();
+    this.rowCounter = 0L;
+  }
+
+  @Override
+  public void addRow(RowData key, RowData row) throws IOException {
+    String keyHex = serializeKeyToHex(key);
+    String compoundKey = keyHex + KEY_SEPARATOR + rowCounter++;
+    byte[] valueBytes = serializeRow(row);
+    rocksDBDAO.put(COLUMN_FAMILY, compoundKey, valueBytes);
+  }
+
+  @Override
+  @Nullable
+  public List<RowData> getRows(RowData key) throws IOException {
+    String prefix = serializeKeyToHex(key) + KEY_SEPARATOR;
+    List<byte[]> rawValues = rocksDBDAO.<byte[]>prefixSearch(COLUMN_FAMILY, 
prefix)
+        .map(pair -> pair.getValue())
+        .collect(Collectors.toList());
+    if (rawValues.isEmpty()) {
+      return null;
+    }
+    List<RowData> result = new ArrayList<>(rawValues.size());
+    for (byte[] bytes : rawValues) {
+      result.add(deserializeRow(bytes));
+    }
+    return result;
+  }
+
+  @Override
+  public void clear() {
+    if (rocksDBDAO != null) {
+      rocksDBDAO.close();
+    }
+    rocksDBDAO = createDAO();
+    rowCounter = 0L;
+    log.debug("RocksDB lookup cache cleared and reinitialized at {}", 
rocksDbBasePath);
+  }
+
+  @Override
+  public void close() {
+    if (rocksDBDAO != null) {
+      rocksDBDAO.close();
+      rocksDBDAO = null;
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Internals
+  // -------------------------------------------------------------------------
+
+  private RocksDBDAO createDAO() {
+    ConcurrentHashMap<String, CustomSerializer<?>> serializers = new 
ConcurrentHashMap<>();
+    serializers.put(COLUMN_FAMILY, new PassThroughSerializer());
+    RocksDBDAO dao = new RocksDBDAO("hudi-lookup", rocksDbBasePath, 
serializers);
+    dao.addColumnFamily(COLUMN_FAMILY);
+    return dao;
+  }
+
+  private String serializeKeyToHex(RowData key) throws IOException {
+    keyOutputBuffer.clear();
+    keySerializer.serialize(key, keyOutputBuffer);
+    return bytesToHex(keyOutputBuffer.getCopyOfBuffer());
+  }
+
+  private byte[] serializeRow(RowData row) throws IOException {
+    rowOutputBuffer.clear();
+    rowSerializer.serialize(row, rowOutputBuffer);
+    return rowOutputBuffer.getCopyOfBuffer();
+  }
+
+  private RowData deserializeRow(byte[] bytes) throws IOException {
+    rowInputBuffer.setBuffer(bytes);
+    return rowSerializer.deserialize(rowInputBuffer);
+  }
+
+  private static String bytesToHex(byte[] bytes) {
+    StringBuilder sb = new StringBuilder(bytes.length * 2);
+    for (byte b : bytes) {
+      sb.append(String.format("%02x", b));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * A {@link CustomSerializer} that treats the serialized form as the raw 
byte array itself,
+   * bypassing Java object serialization overhead. Used to store 
pre-serialized Flink
+   * {@link RowData} bytes directly in RocksDB.
+   */
+  private static class PassThroughSerializer implements 
CustomSerializer<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public byte[] serialize(byte[] input) {
+      return input;
+    }
+
+    @Override
+    public byte[] deserialize(byte[] bytes) {
+      return bytes;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 239a254a4f5f..17c0a2d069c2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -826,8 +826,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndAsyncLookupParams")
-  void testLookupJoin(HoodieTableType tableType, boolean async) {
+  @MethodSource("tableTypeCacheTypeAndAsyncLookupParams")
+  void testLookupJoin(HoodieTableType tableType, String cacheType, boolean 
async) {
     TableEnvironment tableEnv = streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
@@ -850,7 +850,8 @@ public class ITTestHoodieDataSource {
 
     // Join two hudi tables with the same data
     String sql = "insert into t2 select b.* from t1_view o "
-        + "       join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day', 
'lookup.async'='" + async + "') */  "
+        + "       join t1/*+ OPTIONS('lookup.join.cache.ttl'='2 day', 
'lookup.async'='" + async + "',"
+        + "       'lookup.join.cache.type'='" + cacheType + "') */  "
         + "       FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
     execInsertSql(tableEnv, sql);
     List<Row> result = CollectionUtil.iterableToList(
@@ -870,14 +871,15 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndAsyncLookupParams")
-  void testLookup(HoodieTableType tableType, boolean async) {
+  @MethodSource("tableTypeCacheTypeAndAsyncLookupParams")
+  void testLookup(HoodieTableType tableType, String cacheType, boolean async) {
     initTablesForLookupJoin(tableType);
     execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (1, 11, 111, 1111), 
(2, 22, 222, 2222)");
     execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3)");
 
     String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ 
OPTIONS('lookup.async'='" + async
-        + "', 'lookup.join.cache.ttl'='1s') */ for system_time as of 
T.proctime AS D ON T.i = D.i";
+        + "', 'lookup.join.cache.type'='" + cacheType + "', 
'lookup.join.cache.ttl'='1s') */"
+        + " for system_time as of T.proctime AS D ON T.i = D.i";
     List<Row> result = CollectionUtil.iterableToList(() -> 
streamTableEnv.executeSql(query).collect());
     assertThat(result).containsExactlyInAnyOrder(
         Row.of(1, 11, 111, 1111),
@@ -3188,12 +3190,16 @@ public class ITTestHoodieDataSource {
   /**
    * Return test params => (table type, async lookup).
    */
-  private static Stream<Arguments> tableTypeAndAsyncLookupParams() {
+  private static Stream<Arguments> tableTypeCacheTypeAndAsyncLookupParams() {
     Object[][] data = new Object[][] {
-        {HoodieTableType.COPY_ON_WRITE, false},
-        {HoodieTableType.COPY_ON_WRITE, true},
-        {HoodieTableType.MERGE_ON_READ, false},
-        {HoodieTableType.MERGE_ON_READ, true}
+        {HoodieTableType.COPY_ON_WRITE, "heap", false},
+        {HoodieTableType.COPY_ON_WRITE, "heap", true},
+        {HoodieTableType.MERGE_ON_READ, "heap", false},
+        {HoodieTableType.MERGE_ON_READ, "heap", true},
+        {HoodieTableType.COPY_ON_WRITE, "rocksdb", false},
+        {HoodieTableType.COPY_ON_WRITE, "rocksdb", true},
+        {HoodieTableType.MERGE_ON_READ, "rocksdb", false},
+        {HoodieTableType.MERGE_ON_READ, "rocksdb", true}
     };
     return Stream.of(data).map(Arguments::of);
   }

Reply via email to