This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f7b805961e05 feat(flink): Off-heap lookup join cache backed by RocksDB
(#18231)
f7b805961e05 is described below
commit f7b805961e050dc5cd325cc85cc7d367fff4d045
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Feb 23 10:41:18 2026 +0700
feat(flink): Off-heap lookup join cache backed by RocksDB (#18231)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 19 +++
.../apache/hudi/table/lookup/HeapLookupCache.java | 60 +++++++
.../hudi/table/lookup/HoodieLookupFunction.java | 56 +++++--
.../org/apache/hudi/table/lookup/LookupCache.java | 60 +++++++
.../hudi/table/lookup/RocksDBLookupCache.java | 181 +++++++++++++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 28 ++--
6 files changed, 382 insertions(+), 22 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 62b496378958..a7b33421e72b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -1306,6 +1306,25 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(16)
.withDescription("The thread number for lookup async.");
+ public static final ConfigOption<String> LOOKUP_JOIN_CACHE_TYPE =
+ key("lookup.join.cache.type")
+ .stringType()
+ .defaultValue("heap")
+ .withDescription("The storage backend for the lookup join cache. "
+ + "Possible values: 'heap' (default) stores all dimension-table
rows in JVM heap memory "
+ + "(may cause OutOfMemoryError for large tables); "
+ + "'rocksdb' stores rows off-heap in an embedded RocksDB
instance on local disk, "
+ + "which avoids OOM at the cost of additional serialization
overhead.");
+
+ public static final ConfigOption<String> LOOKUP_JOIN_ROCKSDB_PATH =
+ key("lookup.join.rocksdb.path")
+ .stringType()
+ .defaultValue(System.getProperty("java.io.tmpdir") +
"/hudi-lookup-rocksdb")
+ .withDescription("Local directory path for storing RocksDB data when
"
+ + "'lookup.join.cache.type' is set to 'rocksdb'. "
+ + "Each task manager will create a unique subdirectory under
this path. "
+ + "The directory is cleaned up when the lookup function is
closed.");
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
new file mode 100644
index 000000000000..455a6432fd81
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HeapLookupCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Heap-based {@link LookupCache} backed by a {@link HashMap}.
+ *
+ * <p>Suitable for small-to-medium dimension tables. For large tables, use
+ * {@link RocksDBLookupCache} to avoid JVM OutOfMemoryError.
+ */
+public class HeapLookupCache implements LookupCache {
+
+ private final Map<RowData, List<RowData>> store = new HashMap<>();
+
+ @Override
+ public void addRow(RowData key, RowData row) {
+ store.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+ }
+
+ @Override
+ @Nullable
+ public List<RowData> getRows(RowData key) {
+ return store.get(key);
+ }
+
+ @Override
+ public void clear() {
+ store.clear();
+ }
+
+ @Override
+ public void close() {
+ store.clear();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index 6663eefd01a7..443cba0bcf50 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.StreamerUtil;
@@ -38,21 +39,25 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import java.io.Closeable;
+import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.stream.Collectors;
/**
* Lookup function for Hoodie dimension table.
*
* <p>Note: reference Flink FileSystemLookupFunction to avoid additional
connector jar dependencies.
+ *
+ * <p>The underlying cache can be heap-based ({@code
lookup.join.cache.type=heap}, default) or
+ * RocksDB-backed ({@code lookup.join.cache.type=rocksdb}). The RocksDB option
stores all dimension
+ * table rows off-heap on local disk, preventing OutOfMemoryError when the
dimension table is large.
*/
@Slf4j
-public class HoodieLookupFunction extends LookupFunction implements
Serializable, Closeable {
+public class HoodieLookupFunction extends LookupFunction implements
Serializable, Closeable {
private static final long serialVersionUID = 1L;
@@ -67,9 +72,10 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
private final Duration reloadInterval;
private final TypeSerializer<RowData> serializer;
private final RowType rowType;
+ private final int[] lookupKeys;
// cache for lookup data
- private transient Map<RowData, List<RowData>> cache;
+ private transient LookupCache cache;
// timestamp when cache expires
private transient long nextLoadTime;
@@ -86,6 +92,7 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
Configuration conf) {
this.partitionReader = partitionReader;
this.rowType = rowType;
+ this.lookupKeys = lookupKeys;
this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
for (int i = 0; i < lookupKeys.length; i++) {
lookupFieldGetters[i] =
@@ -99,7 +106,7 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
@Override
public void open(FunctionContext context) throws Exception {
functionContext = context;
- cache = new HashMap<>();
+ cache = createCache();
nextLoadTime = -1L;
org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
@@ -109,13 +116,13 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
public Collection<RowData> lookup(RowData keyRow) {
try {
checkCacheReload();
- return cache.get(keyRow);
+ return cache.getRows(keyRow);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- private void checkCacheReload() {
+ private void checkCacheReload() throws IOException {
if (nextLoadTime > System.currentTimeMillis()) {
return;
}
@@ -151,10 +158,10 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
count++;
RowData rowData = serializer.copy(row);
RowData key = extractLookupKey(rowData);
- List<RowData> rows = cache.computeIfAbsent(key, k -> new
ArrayList<>());
- rows.add(rowData);
+ cache.addRow(key, rowData);
}
partitionReader.close();
+ currentCommit = latestCommitInstant.get();
nextLoadTime = System.currentTimeMillis() + reloadInterval.toMillis();
log.info("Loaded {} row(s) into lookup join cache", count);
return;
@@ -188,7 +195,34 @@ public class HoodieLookupFunction extends LookupFunction
implements Serializabl
@Override
public void close() {
- // no operation
+ if (cache != null) {
+ try {
+ cache.close();
+ } catch (Exception e) {
+ log.warn("Failed to close lookup cache", e);
+ }
+ cache = null;
+ }
+ }
+
+ private LookupCache createCache() {
+ String cacheType = conf.get(FlinkOptions.LOOKUP_JOIN_CACHE_TYPE);
+ if ("rocksdb".equalsIgnoreCase(cacheType)) {
+ String rocksDbPath = conf.get(FlinkOptions.LOOKUP_JOIN_ROCKSDB_PATH);
+ log.info("Creating RocksDB lookup cache at {}", rocksDbPath);
+ RowType keyRowType = buildKeyRowType();
+ TypeSerializer<RowData> keySerializer =
InternalSerializers.create(keyRowType);
+ return new RocksDBLookupCache(keySerializer, serializer, rocksDbPath);
+ }
+ log.info("Creating heap lookup cache");
+ return new HeapLookupCache();
+ }
+
+ private RowType buildKeyRowType() {
+ List<RowType.RowField> keyFields = Arrays.stream(lookupKeys)
+ .mapToObj(i -> rowType.getFields().get(i))
+ .collect(Collectors.toList());
+ return new RowType(keyFields);
}
@VisibleForTesting
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
new file mode 100644
index 000000000000..93e99c913e7c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Cache abstraction for lookup join dimension table data.
+ *
+ * <p>Implementations may store data on-heap (e.g. {@link HeapLookupCache}) or
off-heap
+ * (e.g. {@link RocksDBLookupCache}) to avoid JVM OutOfMemoryError on large
tables.
+ */
+public interface LookupCache extends Closeable {
+
+ /**
+ * Adds a single row to the cache under the given lookup key.
+ * Multiple rows may be associated with the same key.
+ *
+ * @param key the lookup key row (contains only the join key fields)
+ * @param row the full dimension table row
+ * @throws IOException if the write fails
+ */
+ void addRow(RowData key, RowData row) throws IOException;
+
+ /**
+ * Returns all rows matching the given lookup key, or {@code null} / empty
list if none exist.
+ *
+ * @param key the lookup key row
+ * @return matching rows, or {@code null} if not found
+ */
+ @Nullable
+ List<RowData> getRows(RowData key) throws IOException;
+
+ /**
+ * Clears all entries from the cache so it can be reloaded.
+ */
+ void clear() throws IOException;
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
new file mode 100644
index 000000000000..4bb41dfcc9a6
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/RocksDBLookupCache.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * Off-heap {@link LookupCache} backed by RocksDB.
+ *
+ * <p>Data is stored on disk / in the OS page cache, keeping JVM heap usage
minimal. Each lookup
+ * key maps to one or more full dimension-table rows. The compound RocksDB key
encodes the lookup
+ * key bytes in hexadecimal plus a monotonic counter so that multiple rows
sharing the same lookup
+ * key can coexist in the store without overwriting each other. Retrieval is a
prefix scan on the
+ * hex-encoded lookup key.
+ *
+ * <p>Keys are serialized with Flink's {@link TypeSerializer} for the key
{@link
+ * org.apache.flink.table.types.logical.RowType}, and values (full rows) are
serialized with the
+ * row-level {@link TypeSerializer}. The raw bytes are stored as-is in RocksDB
via a
+ * pass-through {@link CustomSerializer} — no additional Java-serialization
overhead.
+ */
+@Slf4j
+public class RocksDBLookupCache implements LookupCache {
+
+ private static final String COLUMN_FAMILY = "lookup_cache";
+
+ /** Separator between the hex-encoded key and the row counter in the
compound RocksDB key. */
+ private static final String KEY_SEPARATOR = "_";
+
+ private final TypeSerializer<RowData> keySerializer;
+ private final TypeSerializer<RowData> rowSerializer;
+ private final String rocksDbBasePath;
+
+ /** Reusable serialization buffer — single-threaded use only. */
+ private final DataOutputSerializer keyOutputBuffer = new
DataOutputSerializer(64);
+ private final DataOutputSerializer rowOutputBuffer = new
DataOutputSerializer(256);
+ private final DataInputDeserializer rowInputBuffer = new
DataInputDeserializer();
+
+ private RocksDBDAO rocksDBDAO;
+ private long rowCounter;
+
+ public RocksDBLookupCache(
+ TypeSerializer<RowData> keySerializer,
+ TypeSerializer<RowData> rowSerializer,
+ String rocksDbBasePath) {
+ this.keySerializer = keySerializer;
+ this.rowSerializer = rowSerializer;
+ this.rocksDbBasePath = rocksDbBasePath;
+ this.rocksDBDAO = createDAO();
+ this.rowCounter = 0L;
+ }
+
+ @Override
+ public void addRow(RowData key, RowData row) throws IOException {
+ String keyHex = serializeKeyToHex(key);
+ String compoundKey = keyHex + KEY_SEPARATOR + rowCounter++;
+ byte[] valueBytes = serializeRow(row);
+ rocksDBDAO.put(COLUMN_FAMILY, compoundKey, valueBytes);
+ }
+
+ @Override
+ @Nullable
+ public List<RowData> getRows(RowData key) throws IOException {
+ String prefix = serializeKeyToHex(key) + KEY_SEPARATOR;
+ List<byte[]> rawValues = rocksDBDAO.<byte[]>prefixSearch(COLUMN_FAMILY,
prefix)
+ .map(pair -> pair.getValue())
+ .collect(Collectors.toList());
+ if (rawValues.isEmpty()) {
+ return null;
+ }
+ List<RowData> result = new ArrayList<>(rawValues.size());
+ for (byte[] bytes : rawValues) {
+ result.add(deserializeRow(bytes));
+ }
+ return result;
+ }
+
+ @Override
+ public void clear() {
+ if (rocksDBDAO != null) {
+ rocksDBDAO.close();
+ }
+ rocksDBDAO = createDAO();
+ rowCounter = 0L;
+ log.debug("RocksDB lookup cache cleared and reinitialized at {}",
rocksDbBasePath);
+ }
+
+ @Override
+ public void close() {
+ if (rocksDBDAO != null) {
+ rocksDBDAO.close();
+ rocksDBDAO = null;
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Internals
+ // -------------------------------------------------------------------------
+
+ private RocksDBDAO createDAO() {
+ ConcurrentHashMap<String, CustomSerializer<?>> serializers = new
ConcurrentHashMap<>();
+ serializers.put(COLUMN_FAMILY, new PassThroughSerializer());
+ RocksDBDAO dao = new RocksDBDAO("hudi-lookup", rocksDbBasePath,
serializers);
+ dao.addColumnFamily(COLUMN_FAMILY);
+ return dao;
+ }
+
+ private String serializeKeyToHex(RowData key) throws IOException {
+ keyOutputBuffer.clear();
+ keySerializer.serialize(key, keyOutputBuffer);
+ return bytesToHex(keyOutputBuffer.getCopyOfBuffer());
+ }
+
+ private byte[] serializeRow(RowData row) throws IOException {
+ rowOutputBuffer.clear();
+ rowSerializer.serialize(row, rowOutputBuffer);
+ return rowOutputBuffer.getCopyOfBuffer();
+ }
+
+ private RowData deserializeRow(byte[] bytes) throws IOException {
+ rowInputBuffer.setBuffer(bytes);
+ return rowSerializer.deserialize(rowInputBuffer);
+ }
+
+ private static String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder(bytes.length * 2);
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * A {@link CustomSerializer} that treats the serialized form as the raw
byte array itself,
+ * bypassing Java object serialization overhead. Used to store
pre-serialized Flink
+ * {@link RowData} bytes directly in RocksDB.
+ */
+ private static class PassThroughSerializer implements
CustomSerializer<byte[]> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public byte[] serialize(byte[] input) {
+ return input;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] bytes) {
+ return bytes;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 239a254a4f5f..17c0a2d069c2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -826,8 +826,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndAsyncLookupParams")
- void testLookupJoin(HoodieTableType tableType, boolean async) {
+ @MethodSource("tableTypeCacheTypeAndAsyncLookupParams")
+ void testLookupJoin(HoodieTableType tableType, String cacheType, boolean
async) {
TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
@@ -850,7 +850,8 @@ public class ITTestHoodieDataSource {
// Join two hudi tables with the same data
String sql = "insert into t2 select b.* from t1_view o "
- + " join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day',
'lookup.async'='" + async + "') */ "
+ + " join t1/*+ OPTIONS('lookup.join.cache.ttl'='2 day',
'lookup.async'='" + async + "',"
+ + " 'lookup.join.cache.type'='" + cacheType + "') */ "
+ " FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
execInsertSql(tableEnv, sql);
List<Row> result = CollectionUtil.iterableToList(
@@ -870,14 +871,15 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @MethodSource("tableTypeAndAsyncLookupParams")
- void testLookup(HoodieTableType tableType, boolean async) {
+ @MethodSource("tableTypeCacheTypeAndAsyncLookupParams")
+ void testLookup(HoodieTableType tableType, String cacheType, boolean async) {
initTablesForLookupJoin(tableType);
execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (1, 11, 111, 1111),
(2, 22, 222, 2222)");
execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3)");
String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('lookup.async'='" + async
- + "', 'lookup.join.cache.ttl'='1s') */ for system_time as of
T.proctime AS D ON T.i = D.i";
+ + "', 'lookup.join.cache.type'='" + cacheType + "',
'lookup.join.cache.ttl'='1s') */"
+ + " for system_time as of T.proctime AS D ON T.i = D.i";
List<Row> result = CollectionUtil.iterableToList(() ->
streamTableEnv.executeSql(query).collect());
assertThat(result).containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
@@ -3188,12 +3190,16 @@ public class ITTestHoodieDataSource {
/**
* Return test params => (table type, async lookup).
*/
- private static Stream<Arguments> tableTypeAndAsyncLookupParams() {
+ private static Stream<Arguments> tableTypeCacheTypeAndAsyncLookupParams() {
Object[][] data = new Object[][] {
- {HoodieTableType.COPY_ON_WRITE, false},
- {HoodieTableType.COPY_ON_WRITE, true},
- {HoodieTableType.MERGE_ON_READ, false},
- {HoodieTableType.MERGE_ON_READ, true}
+ {HoodieTableType.COPY_ON_WRITE, "heap", false},
+ {HoodieTableType.COPY_ON_WRITE, "heap", true},
+ {HoodieTableType.MERGE_ON_READ, "heap", false},
+ {HoodieTableType.MERGE_ON_READ, "heap", true},
+ {HoodieTableType.COPY_ON_WRITE, "rocksdb", false},
+ {HoodieTableType.COPY_ON_WRITE, "rocksdb", true},
+ {HoodieTableType.MERGE_ON_READ, "rocksdb", false},
+ {HoodieTableType.MERGE_ON_READ, "rocksdb", true}
};
return Stream.of(data).map(Arguments::of);
}