This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e55dde87 [FLINK-29154] Support LookupTableSource for table store
e55dde87 is described below

commit e55dde87296b51132d37fb4265bd3c863f05db40
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Tue Sep 6 16:20:58 2022 +0800

    [FLINK-29154] Support LookupTableSource for table store
    
    This closes #281
---
 docs/content/docs/development/configuration.md     |   2 +-
 docs/content/docs/development/lookup-join.md       |  93 +++++
 docs/content/docs/development/rescale-bucket.md    |   2 +-
 docs/content/docs/development/roadmap.md           |   2 +-
 .../generated/rocksdb_configuration.html           | 114 ++++++
 .../store/file/predicate/PredicateBuilder.java     |   6 +
 .../store/file/predicate/PredicateConverter.java   |  21 +-
 .../store/file/predicate/PredicateFilter.java      |  42 ++
 .../table/store/utils/KeyProjectedRowData.java     | 163 ++++++++
 flink-table-store-connector/pom.xml                |   6 +
 .../table/store/connector/RocksDBOptions.java      | 349 +++++++++++++++++
 .../connector/lookup/FileStoreLookupFunction.java  | 216 +++++++++++
 .../table/store/connector/lookup/LookupTable.java  |  53 +++
 .../connector/lookup/PrimaryKeyLookupTable.java    |  88 +++++
 .../store/connector/lookup/RocksDBSetState.java    | 124 ++++++
 .../table/store/connector/lookup/RocksDBState.java | 130 +++++++
 .../connector/lookup/RocksDBStateFactory.java      | 100 +++++
 .../store/connector/lookup/RocksDBValueState.java  | 102 +++++
 .../lookup/SecondaryIndexLookupTable.java          |  97 +++++
 .../source/ContinuousFileSplitEnumerator.java      |  76 +---
 .../store/connector/source/FileStoreSource.java    |   2 +-
 .../store/connector/source/TableStoreSource.java   |  21 +
 .../table/store/connector/LookupJoinITCase.java    | 425 +++++++++++++++++++++
 .../store/connector/lookup/LookupTableTest.java    | 170 +++++++++
 .../store/table/source/SnapshotEnumerator.java     |  97 +++++
 .../store/table/source/TableStreamingReader.java   | 123 ++++++
 .../file/predicate/PredicateConverterTest.java     |  14 +-
 27 files changed, 2553 insertions(+), 85 deletions(-)

diff --git a/docs/content/docs/development/configuration.md 
b/docs/content/docs/development/configuration.md
index 5d15dc35..2b6ed92a 100644
--- a/docs/content/docs/development/configuration.md
+++ b/docs/content/docs/development/configuration.md
@@ -1,6 +1,6 @@
 ---
 title: "Configuration"
-weight: 7
+weight: 8
 type: docs
 aliases:
 - /development/configuration.html
diff --git a/docs/content/docs/development/lookup-join.md 
b/docs/content/docs/development/lookup-join.md
new file mode 100644
index 00000000..85ed8a3b
--- /dev/null
+++ b/docs/content/docs/development/lookup-join.md
@@ -0,0 +1,93 @@
+---
+title: "Lookup Join"
+weight: 6
+type: docs
+aliases:
+- /development/lookup-join.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Lookup Join
+
+A [Lookup 
Join](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/joins/)
+is used to enrich a table with data that is queried from Flink Table Store. 
The join requires one table to have
+a processing time attribute and the other table to be backed by a lookup 
source connector.
+
+First, create a table, and update it in real-time.
+
+```sql
+-- Create a table store catalog
+CREATE CATALOG my_catalog WITH (
+  'type'='table-store',
+  'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
+);
+
+USE CATALOG my_catalog;
+
+-- Create a table in table-store catalog
+CREATE TABLE customers (
+  id INT PRIMARY KEY NOT ENFORCED,
+  name STRING,
+  country STRING,
+  zip STRING
+);
+
+-- Launch a streaming job to update customers table
+INSERT INTO customers ...
+```
+
+Then, you can use this table in lookup join.
+
+```sql
+-- enrich each order with customer information
+SELECT o.order_id, o.total, c.country, c.zip
+FROM Orders AS o
+         JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
+              ON o.customer_id = c.id;
+```
+
+The lookup join node will maintain the rocksdb cache locally and pull the 
latest updates
+of the table in real time, and only pull the necessary data. Therefore, your 
filter conditions
+are also very important.
+
+In order to avoid excessive use of local disks, the lookup join feature is 
only suitable
+for table sizes below tens of millions.
+
+{{< hint info >}}
+__Note:__ Partitioned or non-pk tables are not supported now.
+{{< /hint >}}
+
+Project pushdown can effectively reduce the overhead,
+[FLINK-29138](https://issues.apache.org/jira/browse/FLINK-29138) fixed the bug 
that
+the project cannot be pushed down to the source. So it is preferable to use a 
version
+greater than or equal to `flink 1.14.6`, `flink 1.15.3`. Or you can 
cherry-pick the
+commit to your own Flink branch.
+
+## RocksDBOptions
+
+Options for rocksdb cache, you can configure options in `WITH` or dynamic 
table hints.
+
+```sql
+SELECT o.order_id, o.total, c.country, c.zip
+  FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
+  FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
+```
+
+{{< generated/rocksdb_configuration >}}
diff --git a/docs/content/docs/development/rescale-bucket.md 
b/docs/content/docs/development/rescale-bucket.md
index c937256a..6e688e9f 100644
--- a/docs/content/docs/development/rescale-bucket.md
+++ b/docs/content/docs/development/rescale-bucket.md
@@ -1,6 +1,6 @@
 ---
 title: "Rescale Bucket"
-weight: 6
+weight: 7
 type: docs
 aliases:
 - /development/rescale-bucket.html
diff --git a/docs/content/docs/development/roadmap.md 
b/docs/content/docs/development/roadmap.md
index f88ce6bf..9c335948 100644
--- a/docs/content/docs/development/roadmap.md
+++ b/docs/content/docs/development/roadmap.md
@@ -1,6 +1,6 @@
 ---
 title: "Roadmap"
-weight: 8
+weight: 9
 type: docs
 aliases:
 - /development/roadmap.html
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
new file mode 100644
index 00000000..d6d409db
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -0,0 +1,114 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>lookup.cache-rows</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Long</td>
+            <td>The maximum number of rows to store in the cache</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.blocksize</h5></td>
+            <td style="word-wrap: break-word;">4 kb</td>
+            <td>MemorySize</td>
+            <td>The approximate size (in bytes) of user data packed per block. 
The default blocksize is '4KB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.cache-size</h5></td>
+            <td style="word-wrap: break-word;">8 mb</td>
+            <td>MemorySize</td>
+            <td>The amount of the cache for data blocks in RocksDB. The 
default block-cache size is '8MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.metadata-blocksize</h5></td>
+            <td style="word-wrap: break-word;">4 kb</td>
+            <td>MemorySize</td>
+            <td>Approximate size of partitioned metadata packed per block. 
Currently applied to indexes block when partitioned index/filters option is 
enabled. The default blocksize is '4KB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.bloom-filter.bits-per-key</h5></td>
+            <td style="word-wrap: break-word;">10.0</td>
+            <td>Double</td>
+            <td>Bits per key that bloom filter will use, this only take effect 
when bloom filter is used. The default value is 10.0.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.bloom-filter.block-based-mode</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, RocksDB will use block-based filter instead of full 
filter, this only take effect when bloom filter is used. The default value is 
'false'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.max-size-level-base</h5></td>
+            <td style="word-wrap: break-word;">256 mb</td>
+            <td>MemorySize</td>
+            <td>The upper-bound of the total size of level base files in 
bytes. The default value is '256MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.target-file-size-base</h5></td>
+            <td style="word-wrap: break-word;">64 mb</td>
+            <td>MemorySize</td>
+            <td>The target file size for compaction, which determines a 
level-1 file size. The default value is '64MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.use-dynamic-size</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, RocksDB will pick target size of each level 
dynamically. From an empty DB, RocksDB would make last level the base level, 
which means merging L0 data into the last level, until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. The default value is 'false'. For more information, please refer to 
<a 
href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true";>RocksDB's
 [...]
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.style</h5></td>
+            <td style="word-wrap: break-word;">LEVEL</td>
+            <td><p>Enum</p></td>
+            <td>The specified compaction style for DB. Candidate compaction 
style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default 
style.<br /><br />Possible 
values:<ul><li>"LEVEL"</li><li>"UNIVERSAL"</li><li>"FIFO"</li><li>"NONE"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compression.type</h5></td>
+            <td style="word-wrap: break-word;">LZ4_COMPRESSION</td>
+            <td><p>Enum</p></td>
+            <td>The compression type.<br /><br />Possible 
values:<ul><li>"NO_COMPRESSION"</li><li>"SNAPPY_COMPRESSION"</li><li>"ZLIB_COMPRESSION"</li><li>"BZLIB2_COMPRESSION"</li><li>"LZ4_COMPRESSION"</li><li>"LZ4HC_COMPRESSION"</li><li>"XPRESS_COMPRESSION"</li><li>"ZSTD_COMPRESSION"</li><li>"DISABLE_COMPRESSION_OPTION"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.files.open</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Integer</td>
+            <td>The maximum number of open files (per stateful operator) that 
can be used by the DB, '-1' means no limit. The default value is '-1'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.thread.num</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>The maximum number of concurrent background flush and 
compaction jobs (per stateful operator). The default value is '2'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.use-bloom-filter</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, every newly created SST file will contain a Bloom 
filter. It is disabled by default.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.count</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>The maximum number of write buffers that are built up in 
memory. The default value is '2'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.number-to-merge</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The minimum number of write buffers that will be merged 
together before writing to storage. The default value is '1'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.size</h5></td>
+            <td style="word-wrap: break-word;">64 mb</td>
+            <td>MemorySize</td>
+            <td>The amount of data built up in memory (backed by an unsorted 
log on disk) before converting to a sorted on-disk files. The default 
writebuffer size is '64MB'.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index a1b86a2d..cdaff153 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -49,9 +49,15 @@ import static java.util.Collections.singletonList;
 public class PredicateBuilder {
 
     private final RowType rowType;
+    private final List<String> fieldNames;
 
     public PredicateBuilder(RowType rowType) {
         this.rowType = rowType;
+        this.fieldNames = rowType.getFieldNames();
+    }
+
+    public int indexOf(String field) {
+        return fieldNames.indexOf(field);
     }
 
     public Predicate equal(int idx, Object literal) {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index baaf768d..9cdfcaf6 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -47,7 +47,12 @@ import java.util.regex.Pattern;
 import static 
org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
 
-/** Convert {@link Expression} to {@link Predicate}. */
+/**
+ * Convert {@link Expression} to {@link Predicate}.
+ *
+ * <p>For {@link FieldReferenceExpression}, please use name instead of index, 
if the project
+ * pushdown is before and the filter pushdown is after, the index of the 
filter will be projected.
+ */
 public class PredicateConverter implements ExpressionVisitor<Predicate> {
 
     private final PredicateBuilder builder;
@@ -91,15 +96,17 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
             for (int i = 1; i < children.size(); i++) {
                 literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), 
children.get(i)));
             }
-            return builder.in(fieldRefExpr.getInputIndex(), literals);
+            return builder.in(builder.indexOf(fieldRefExpr.getName()), 
literals);
         } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
             return extractFieldReference(children.get(0))
-                    .map(FieldReferenceExpression::getFieldIndex)
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
                     .map(builder::isNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
             return extractFieldReference(children.get(0))
-                    .map(FieldReferenceExpression::getFieldIndex)
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
                     .map(builder::isNotNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.LIKE) {
@@ -164,7 +171,7 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
                     Matcher beginMatcher = 
BEGIN_PATTERN.matcher(escapedSqlPattern);
                     if (beginMatcher.matches()) {
                         return builder.startsWith(
-                                fieldRefExpr.getFieldIndex(),
+                                builder.indexOf(fieldRefExpr.getName()),
                                 
BinaryStringData.fromString(beginMatcher.group(1)));
                     }
                 }
@@ -184,13 +191,13 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
         if (fieldRefExpr.isPresent()) {
             Object literal =
                     extractLiteral(fieldRefExpr.get().getOutputDataType(), 
children.get(1));
-            return visit1.apply(fieldRefExpr.get().getFieldIndex(), literal);
+            return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), 
literal);
         } else {
             fieldRefExpr = extractFieldReference(children.get(1));
             if (fieldRefExpr.isPresent()) {
                 Object literal =
                         extractLiteral(fieldRefExpr.get().getOutputDataType(), 
children.get(0));
-                return visit2.apply(fieldRefExpr.get().getFieldIndex(), 
literal);
+                return 
visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), literal);
             }
         }
 
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
new file mode 100644
index 00000000..2c4f0528
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+/** A {@link java.util.function.Predicate} to filter {@link RowData}. */
+public class PredicateFilter implements java.util.function.Predicate<RowData> {
+
+    private final RowDataToObjectArrayConverter arrayConverter;
+    @Nullable private final Predicate predicate;
+
+    public PredicateFilter(RowType rowType, @Nullable Predicate predicate) {
+        this.arrayConverter = new RowDataToObjectArrayConverter(rowType);
+        this.predicate = predicate;
+    }
+
+    @Override
+    public boolean test(RowData rowData) {
+        return predicate == null || 
predicate.test(arrayConverter.convert(rowData));
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java
new file mode 100644
index 00000000..c3139a81
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+
+/** A {@link RowData} to project key fields with {@link RowKind#INSERT}. */
+public class KeyProjectedRowData implements RowData {
+
+    private final int[] indexMapping;
+
+    private RowData row;
+
+    public KeyProjectedRowData(int[] indexMapping) {
+        this.indexMapping = indexMapping;
+    }
+
+    public KeyProjectedRowData replaceRow(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return RowKind.INSERT;
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        throw new UnsupportedOperationException("Key row data should always be 
insert only.");
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(indexMapping[pos]);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(indexMapping[pos]);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(indexMapping[pos]);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(indexMapping[pos]);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(indexMapping[pos]);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(indexMapping[pos]);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(indexMapping[pos]);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(indexMapping[pos]);
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return row.getString(indexMapping[pos]);
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(indexMapping[pos], precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return row.getTimestamp(indexMapping[pos], precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        return row.getRawValue(indexMapping[pos]);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(indexMapping[pos]);
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return row.getArray(indexMapping[pos]);
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return row.getMap(indexMapping[pos]);
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        return row.getRow(indexMapping[pos], numFields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException("Projected row data cannot be 
compared");
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException("Projected row data cannot be 
hashed");
+    }
+
+    @Override
+    public String toString() {
+        return getRowKind().shortString()
+                + "{"
+                + "indexMapping="
+                + Arrays.toString(indexMapping)
+                + ", mutableRow="
+                + row
+                + '}';
+    }
+}
diff --git a/flink-table-store-connector/pom.xml 
b/flink-table-store-connector/pom.xml
index 45db1b95..aab108e4 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -68,6 +68,12 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>frocksdbjni</artifactId>
+            <version>6.20.3-ververica-1.0</version>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java
new file mode 100644
index 00000000..2889abd1
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.TableFormatConfig;
+
+import java.io.File;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.rocksdb.CompactionStyle.FIFO;
+import static org.rocksdb.CompactionStyle.LEVEL;
+import static org.rocksdb.CompactionStyle.NONE;
+import static org.rocksdb.CompactionStyle.UNIVERSAL;
+import static org.rocksdb.CompressionType.LZ4_COMPRESSION;
+import static org.rocksdb.InfoLogLevel.INFO_LEVEL;
+
+/** Options for rocksdb. Copied from flink {@code RocksDBConfigurableOptions}. 
*/
+public class RocksDBOptions {
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_ROWS =
+            key("lookup.cache-rows")
+                    .longType()
+                    .defaultValue(10_000L)
+                    .withDescription("The maximum number of rows to store in 
the cache");
+
+    // 
--------------------------------------------------------------------------
+    // Provided configurable DBOptions within Flink
+    // 
--------------------------------------------------------------------------
+
+    public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
+            key("rocksdb.thread.num")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription(
+                            "The maximum number of concurrent background flush 
and compaction jobs (per stateful operator). "
+                                    + "The default value is '2'.");
+
+    public static final ConfigOption<Integer> MAX_OPEN_FILES =
+            key("rocksdb.files.open")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The maximum number of open files (per stateful 
operator) that can be used by the DB, '-1' means no limit. "
+                                    + "The default value is '-1'.");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<MemorySize> LOG_MAX_FILE_SIZE =
+            key("rocksdb.log.max-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("25mb"))
+                    .withDescription(
+                            "The maximum size of RocksDB's file used for 
information logging. "
+                                    + "If the log files becomes larger than 
this, a new file will be created. "
+                                    + "If 0, all logs will be written to one 
log file. "
+                                    + "The default maximum file size is 
'25MB'. ");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<Integer> LOG_FILE_NUM =
+            key("rocksdb.log.file-num")
+                    .intType()
+                    .defaultValue(4)
+                    .withDescription(
+                            "The maximum number of files RocksDB should keep 
for information logging (Default setting: 4).");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<String> LOG_DIR =
+            key("rocksdb.log.dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The directory for RocksDB's information logging 
files. "
+                                    + "If empty (Flink default setting), log 
files will be in the same directory as the Flink log. "
+                                    + "If non-empty, this directory will be 
used and the data directory's absolute path will be used as the prefix of the 
log file name.");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<InfoLogLevel> LOG_LEVEL =
+            key("rocksdb.log.level")
+                    .enumType(InfoLogLevel.class)
+                    .defaultValue(INFO_LEVEL)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The specified information logging 
level for RocksDB. "
+                                                    + "If unset, Flink will 
use %s.",
+                                            code(INFO_LEVEL.name()))
+                                    .linebreak()
+                                    .text(
+                                            "Note: RocksDB info logs will not 
be written to the TaskManager logs and there "
+                                                    + "is no rolling strategy, 
unless you configure %s, %s, and %s accordingly. "
+                                                    + "Without a rolling 
strategy, long-running tasks may lead to uncontrolled "
+                                                    + "disk space usage if 
configured with increased log levels!",
+                                            code(LOG_DIR.key()),
+                                            code(LOG_MAX_FILE_SIZE.key()),
+                                            code(LOG_FILE_NUM.key()))
+                                    .linebreak()
+                                    .text(
+                                            "There is no need to modify the 
RocksDB log level, unless for troubleshooting RocksDB.")
+                                    .build());
+
+    // 
--------------------------------------------------------------------------
+    // Provided configurable ColumnFamilyOptions within Flink
+    // 
--------------------------------------------------------------------------
+
+    public static final ConfigOption<CompressionType> COMPRESSION_TYPE =
+            key("rocksdb.compression.type")
+                    .enumType(CompressionType.class)
+                    .defaultValue(LZ4_COMPRESSION)
+                    .withDescription("The compression type.");
+
+    public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
+            key("rocksdb.compaction.style")
+                    .enumType(CompactionStyle.class)
+                    .defaultValue(LEVEL)
+                    .withDescription(
+                            String.format(
+                                    "The specified compaction style for DB. 
Candidate compaction style is %s, %s, %s or %s, "
+                                            + "and Flink chooses '%s' as 
default style.",
+                                    LEVEL.name(),
+                                    FIFO.name(),
+                                    UNIVERSAL.name(),
+                                    NONE.name(),
+                                    LEVEL.name()));
+
+    public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE =
+            key("rocksdb.compaction.level.use-dynamic-size")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "If true, RocksDB will pick target 
size of each level dynamically. From an empty DB, ")
+                                    .text(
+                                            "RocksDB would make last level the 
base level, which means merging L0 data into the last level, ")
+                                    .text(
+                                            "until it exceeds 
max_bytes_for_level_base. And then repeat this process for second last level 
and so on. ")
+                                    .text("The default value is 'false'. ")
+                                    .text(
+                                            "For more information, please 
refer to %s",
+                                            link(
+                                                    
"https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true";,
+                                                    "RocksDB's doc."))
+                                    .build());
+
+    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
+            key("rocksdb.compaction.level.target-file-size-base")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64mb"))
+                    .withDescription(
+                            "The target file size for compaction, which 
determines a level-1 file size. "
+                                    + "The default value is '64MB'.");
+
+    public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
+            key("rocksdb.compaction.level.max-size-level-base")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("256mb"))
+                    .withDescription(
+                            "The upper-bound of the total size of level base 
files in bytes. "
+                                    + "The default value is '256MB'.");
+
+    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
+            key("rocksdb.writebuffer.size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64mb"))
+                    .withDescription(
+                            "The amount of data built up in memory (backed by 
an unsorted log on disk) "
+                                    + "before converting to a sorted on-disk 
files. The default writebuffer size is '64MB'.");
+
+    public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER =
+            key("rocksdb.writebuffer.count")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription(
+                            "The maximum number of write buffers that are 
built up in memory. "
+                                    + "The default value is '2'.");
+
+    public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE 
=
+            key("rocksdb.writebuffer.number-to-merge")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The minimum number of write buffers that will be 
merged together before writing to storage. "
+                                    + "The default value is '1'.");
+
+    public static final ConfigOption<MemorySize> BLOCK_SIZE =
+            key("rocksdb.block.blocksize")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("4kb"))
+                    .withDescription(
+                            "The approximate size (in bytes) of user data 
packed per block. "
+                                    + "The default blocksize is '4KB'.");
+
+    public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+            key("rocksdb.block.metadata-blocksize")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("4kb"))
+                    .withDescription(
+                            "Approximate size of partitioned metadata packed 
per block. "
+                                    + "Currently applied to indexes block when 
partitioned index/filters option is enabled. "
+                                    + "The default blocksize is '4KB'.");
+
+    public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
+            key("rocksdb.block.cache-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("8mb"))
+                    .withDescription(
+                            "The amount of the cache for data blocks in 
RocksDB. "
+                                    + "The default block-cache size is 
'8MB'.");
+
+    public static final ConfigOption<Boolean> USE_BLOOM_FILTER =
+            key("rocksdb.use-bloom-filter")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, every newly created SST file will 
contain a Bloom filter. "
+                                    + "It is disabled by default.");
+
+    public static final ConfigOption<Double> BLOOM_FILTER_BITS_PER_KEY =
+            key("rocksdb.bloom-filter.bits-per-key")
+                    .doubleType()
+                    .defaultValue(10.0)
+                    .withDescription(
+                            "Bits per key that bloom filter will use, this 
only take effect when bloom filter is used. "
+                                    + "The default value is 10.0.");
+
+    public static final ConfigOption<Boolean> BLOOM_FILTER_BLOCK_BASED_MODE =
+            key("rocksdb.bloom-filter.block-based-mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used. "
+                                    + "The default value is 'false'.");
+
+    public static DBOptions createDBOptions(DBOptions currentOptions, 
Configuration options) {
+        
currentOptions.setMaxBackgroundJobs(options.get(MAX_BACKGROUND_THREADS));
+        currentOptions.setMaxOpenFiles(options.get(MAX_OPEN_FILES));
+        currentOptions.setInfoLogLevel(options.get(LOG_LEVEL));
+
+        String logDir = options.get(LOG_DIR);
+        if (logDir == null || logDir.isEmpty()) {
+            relocateDefaultDbLogDir(currentOptions);
+        } else {
+            currentOptions.setDbLogDir(logDir);
+        }
+
+        
currentOptions.setMaxLogFileSize(options.get(LOG_MAX_FILE_SIZE).getBytes());
+        currentOptions.setKeepLogFileNum(options.get(LOG_FILE_NUM));
+        return currentOptions;
+    }
+
+    /**
+     * Relocates the default log directory of RocksDB with the Flink log 
directory. Finds the Flink
+     * log directory using log.file Java property that is set during startup.
+     *
+     * @param dbOptions The RocksDB {@link DBOptions}.
+     */
+    private static void relocateDefaultDbLogDir(DBOptions dbOptions) {
+        String logFilePath = System.getProperty("log.file");
+        if (logFilePath != null) {
+            File logFile = resolveFileLocation(logFilePath);
+            if (logFile != null && resolveFileLocation(logFile.getParent()) != 
null) {
+                dbOptions.setDbLogDir(logFile.getParent());
+            }
+        }
+    }
+
+    /**
+     * Verify log file location.
+     *
+     * @param logFilePath Path to log file
+     * @return File or null if not a valid log file
+     */
+    private static File resolveFileLocation(String logFilePath) {
+        File logFile = new File(logFilePath);
+        return (logFile.exists() && logFile.canRead()) ? logFile : null;
+    }
+
+    public static ColumnFamilyOptions createColumnOptions(
+            ColumnFamilyOptions currentOptions, Configuration options) {
+        currentOptions.setCompressionType(options.get(COMPRESSION_TYPE));
+        currentOptions.setCompactionStyle(options.get(COMPACTION_STYLE));
+        
currentOptions.setLevelCompactionDynamicLevelBytes(options.get(USE_DYNAMIC_LEVEL_SIZE));
+        
currentOptions.setTargetFileSizeBase(options.get(TARGET_FILE_SIZE_BASE).getBytes());
+        
currentOptions.setMaxBytesForLevelBase(options.get(MAX_SIZE_LEVEL_BASE).getBytes());
+        
currentOptions.setWriteBufferSize(options.get(WRITE_BUFFER_SIZE).getBytes());
+        
currentOptions.setMaxWriteBufferNumber(options.get(MAX_WRITE_BUFFER_NUMBER));
+        currentOptions.setMinWriteBufferNumberToMerge(
+                options.get(MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+
+        BlockBasedTableConfig blockBasedTableConfig;
+        if (tableFormatConfig == null) {
+            blockBasedTableConfig = new BlockBasedTableConfig();
+        } else {
+            if (tableFormatConfig instanceof PlainTableConfig) {
+                // if the table format config is PlainTableConfig, we just 
return current
+                // column-family options
+                return currentOptions;
+            } else {
+                blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
+            }
+        }
+
+        blockBasedTableConfig.setBlockSize(options.get(BLOCK_SIZE).getBytes());
+        
blockBasedTableConfig.setMetadataBlockSize(options.get(METADATA_BLOCK_SIZE).getBytes());
+        
blockBasedTableConfig.setBlockCacheSize(options.get(BLOCK_CACHE_SIZE).getBytes());
+
+        if (options.get(USE_BLOOM_FILTER)) {
+            double bitsPerKey = options.get(BLOOM_FILTER_BITS_PER_KEY);
+            boolean blockBasedMode = 
options.get(BLOOM_FILTER_BLOCK_BASED_MODE);
+            BloomFilter bloomFilter = new BloomFilter(bitsPerKey, 
blockBasedMode);
+            blockBasedTableConfig.setFilterPolicy(bloomFilter);
+        }
+
+        return currentOptions.setTableFormatConfig(blockBasedTableConfig);
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
new file mode 100644
index 00000000..92e663e9
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.TableStreamingReader;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.store.connector.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A lookup {@link TableFunction} for file store. */
+public class FileStoreLookupFunction extends TableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreLookupFunction.class);
+
+    private final FileStoreTable table;
+    private final List<String> projectFields;
+    private final List<String> joinKeys;
+    @Nullable private final Predicate predicate;
+
+    private transient Duration refreshInterval;
+    private transient File path;
+    private transient RocksDBStateFactory stateFactory;
+    private transient LookupTable lookupTable;
+
+    // timestamp when cache expires
+    private transient long nextLoadTime;
+    private transient TableStreamingReader streamingReader;
+
+    public FileStoreLookupFunction(
+            FileStoreTable table,
+            int[] projection,
+            int[] joinKeyIndex,
+            @Nullable Predicate predicate) {
+        TableSchema schema = table.schema();
+        checkArgument(
+                schema.partitionKeys().isEmpty(), "Currently only support 
non-partitioned table.");
+        checkArgument(schema.primaryKeys().size() > 0, "Currently only support 
primary key table.");
+        this.table = table;
+
+        // join keys are based on projection fields
+        this.joinKeys =
+                Arrays.stream(joinKeyIndex)
+                        .mapToObj(i -> schema.fieldNames().get(projection[i]))
+                        .collect(Collectors.toList());
+
+        this.projectFields =
+                Arrays.stream(projection)
+                        .mapToObj(i -> schema.fieldNames().get(i))
+                        .collect(Collectors.toList());
+
+        // add primary keys
+        for (String field : schema.primaryKeys()) {
+            if (!projectFields.contains(field)) {
+                projectFields.add(field);
+            }
+        }
+
+        this.predicate = predicate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        String tmpDirectory = getTmpDirectory(context);
+        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
+
+        Configuration options = 
Configuration.fromMap(table.schema().options());
+        this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
+        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+
+        List<String> fieldNames = 
table.schema().logicalRowType().getFieldNames();
+        int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
+        RowType rowType = TypeUtils.project(table.schema().logicalRowType(), 
projection);
+
+        PredicateFilter recordFilter = createRecordFilter(projection);
+        this.lookupTable =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        table.schema().primaryKeys(),
+                        joinKeys,
+                        recordFilter,
+                        options.getLong(LOOKUP_CACHE_ROWS));
+        this.nextLoadTime = -1;
+        this.streamingReader = new TableStreamingReader(table, projection, 
this.predicate);
+
+        // do first load
+        refresh();
+    }
+
+    private PredicateFilter createRecordFilter(int[] projection) {
+        Predicate adjustedPredicate = null;
+        if (predicate != null) {
+            // adjust to projection index
+            adjustedPredicate =
+                    transformFieldMapping(
+                                    this.predicate,
+                                    IntStream.range(0, 
table.schema().fields().size())
+                                            .map(i -> Ints.indexOf(projection, 
i))
+                                            .toArray())
+                            .orElse(null);
+        }
+        return new PredicateFilter(
+                TypeUtils.project(table.schema().logicalRowType(), 
projection), adjustedPredicate);
+    }
+
+    /** Used by code generation. */
+    @SuppressWarnings("unused")
+    public void eval(Object... values) throws IOException {
+        checkRefresh();
+        List<RowData> results = lookupTable.get(GenericRowData.of(values));
+        for (RowData matchedRow : results) {
+            collect(matchedRow);
+        }
+    }
+
+    private void checkRefresh() throws IOException {
+        if (nextLoadTime > System.currentTimeMillis()) {
+            return;
+        }
+        if (nextLoadTime > 0) {
+            LOG.info(
+                    "Lookup table has refreshed after {} minute(s), 
refreshing",
+                    refreshInterval.toMinutes());
+        }
+
+        refresh();
+
+        nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
+    }
+
+    private void refresh() throws IOException {
+        while (true) {
+            Iterator<RowData> batch = streamingReader.nextBatch();
+            if (batch == null) {
+                return;
+            }
+            this.lookupTable.refresh(batch);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+            stateFactory = null;
+        }
+
+        if (path != null) {
+            FileUtils.deleteDirectoryQuietly(path);
+        }
+    }
+
+    private static String getTmpDirectory(FunctionContext context) {
+        try {
+            Field field = context.getClass().getDeclaredField("context");
+            field.setAccessible(true);
+            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) 
field.get(context);
+            String[] tmpDirectories =
+                    
runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
+            return 
tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java
new file mode 100644
index 00000000..ae829a43
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A lookup table which provides get and refresh. */
+public interface LookupTable {
+
+    List<RowData> get(RowData key) throws IOException;
+
+    void refresh(Iterator<RowData> incremental) throws IOException;
+
+    static LookupTable create(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            List<String> joinKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
+            return new PrimaryKeyLookupTable(
+                    stateFactory, rowType, joinKey, recordFilter, 
lruCacheSize);
+        } else {
+            return new SecondaryIndexLookupTable(
+                    stateFactory, rowType, primaryKey, joinKey, recordFilter, 
lruCacheSize);
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java
new file mode 100644
index 00000000..e6796162
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.store.utils.KeyProjectedRowData;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for primary key table. */
+public class PrimaryKeyLookupTable implements LookupTable {
+
+    protected final RocksDBValueState tableState;
+
+    protected final Predicate<RowData> recordFilter;
+
+    protected int[] primaryKeyMapping;
+
+    protected final KeyProjectedRowData primaryKey;
+
+    public PrimaryKeyLookupTable(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        List<String> fieldNames = rowType.getFieldNames();
+        this.primaryKeyMapping = 
primaryKey.stream().mapToInt(fieldNames::indexOf).toArray();
+        this.primaryKey = new KeyProjectedRowData(primaryKeyMapping);
+        this.tableState =
+                stateFactory.valueState(
+                        "table",
+                        InternalSerializers.create(TypeUtils.project(rowType, 
primaryKeyMapping)),
+                        InternalSerializers.create(rowType),
+                        lruCacheSize);
+        this.recordFilter = recordFilter;
+    }
+
+    @Override
+    public List<RowData> get(RowData key) throws IOException {
+        RowData value = tableState.get(key);
+        return value == null ? Collections.emptyList() : 
Collections.singletonList(value);
+    }
+
+    @Override
+    public void refresh(Iterator<RowData> incremental) throws IOException {
+        while (incremental.hasNext()) {
+            RowData row = incremental.next();
+            primaryKey.replaceRow(row);
+            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+                if (recordFilter.test(row)) {
+                    tableState.put(primaryKey, row);
+                } else {
+                    // The new record under primary key is filtered
+                    // We need to delete this primary key as it no longer 
exists.
+                    tableState.delete(primaryKey);
+                }
+            } else {
+                tableState.delete(primaryKey);
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java
new file mode 100644
index 00000000..99dc6341
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Rocksdb state for key -> Set values. */
+public class RocksDBSetState extends RocksDBState<List<byte[]>> {
+
+    private static final byte[] EMPTY = new byte[0];
+
+    public RocksDBSetState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    public List<RowData> get(RowData key) throws IOException {
+        ByteArray keyBytes = wrap(serializeKey(key));
+        List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
+        if (valueBytes == null) {
+            valueBytes = new ArrayList<>();
+            try (RocksIterator iterator = db.newIterator(columnFamily)) {
+                iterator.seek(keyBytes.bytes);
+
+                while (iterator.isValid() && 
startWithKeyPrefix(keyBytes.bytes, iterator.key())) {
+                    byte[] rawKeyBytes = iterator.key();
+                    byte[] value =
+                            Arrays.copyOfRange(
+                                    rawKeyBytes, keyBytes.bytes.length, 
rawKeyBytes.length);
+                    valueBytes.add(value);
+                    iterator.next();
+                }
+            }
+            cache.put(keyBytes, valueBytes);
+        }
+
+        List<RowData> values = new ArrayList<>(valueBytes.size());
+        for (byte[] value : valueBytes) {
+            valueInputView.setBuffer(value);
+            values.add(valueSerializer.deserialize(valueInputView));
+        }
+        return values;
+    }
+
+    public void retract(RowData key, RowData value) throws IOException {
+        try {
+            byte[] bytes = invalidKeyAndGetKVBytes(key, value);
+            if (db.get(columnFamily, bytes) != null) {
+                db.delete(columnFamily, writeOptions, bytes);
+            }
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void add(RowData key, RowData value) throws IOException {
+        try {
+            byte[] bytes = invalidKeyAndGetKVBytes(key, value);
+            db.put(columnFamily, writeOptions, bytes, EMPTY);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private byte[] invalidKeyAndGetKVBytes(RowData key, RowData value) throws 
IOException {
+        checkArgument(value != null);
+
+        keyOutView.clear();
+        keySerializer.serialize(key, keyOutView);
+
+        // it is hard to maintain cache, invalidate the key.
+        cache.invalidate(wrap(keyOutView.getCopyOfBuffer()));
+
+        valueSerializer.serialize(value, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] 
rawKeyBytes) {
+        if (rawKeyBytes.length < keyPrefixBytes.length) {
+            return false;
+        }
+
+        for (int i = keyPrefixBytes.length; --i >= 0; ) {
+            if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java
new file mode 100644
index 00000000..67ddb9fd
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Rocksdb state for key value. */
+public abstract class RocksDBState<CacheV> {
+
+    protected final RocksDB db;
+
+    protected final WriteOptions writeOptions;
+
+    protected final ColumnFamilyHandle columnFamily;
+
+    protected final TypeSerializer<RowData> keySerializer;
+
+    protected final TypeSerializer<RowData> valueSerializer;
+
+    protected final DataOutputSerializer keyOutView;
+
+    protected final DataInputDeserializer valueInputView;
+
+    protected final DataOutputSerializer valueOutputView;
+
+    protected final Cache<ByteArray, CacheV> cache;
+
+    public RocksDBState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        this.db = db;
+        this.columnFamily = columnFamily;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyOutView = new DataOutputSerializer(32);
+        this.valueInputView = new DataInputDeserializer();
+        this.valueOutputView = new DataOutputSerializer(32);
+        this.writeOptions = new WriteOptions().setDisableWAL(true);
+        this.cache = 
CacheBuilder.newBuilder().maximumSize(lruCacheSize).build();
+    }
+
+    protected byte[] serializeKey(RowData key) throws IOException {
+        keyOutView.clear();
+        keySerializer.serialize(key, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    protected ByteArray wrap(byte[] bytes) {
+        return new ByteArray(bytes);
+    }
+
+    protected Reference ref(byte[] bytes) {
+        return new Reference(bytes);
+    }
+
+    /** A class wraps byte[] to implement equals and hashCode. */
+    protected static class ByteArray {
+
+        protected final byte[] bytes;
+
+        protected ByteArray(byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(bytes);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ByteArray byteArray = (ByteArray) o;
+            return Arrays.equals(bytes, byteArray.bytes);
+        }
+    }
+
+    /** A class wraps byte[] to indicate contain or not contain. */
+    protected static class Reference {
+
+        @Nullable protected final byte[] bytes;
+
+        protected Reference(@Nullable byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        public boolean isPresent() {
+            return bytes != null;
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java
new file mode 100644
index 00000000..1ece9ae0
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.RocksDBOptions;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** Factory to create state. */
+public class RocksDBStateFactory implements Closeable {
+
+    private RocksDB db;
+
+    private final ColumnFamilyOptions columnFamilyOptions;
+
+    public RocksDBStateFactory(String path, Configuration conf) throws 
IOException {
+        DBOptions dbOptions =
+                RocksDBOptions.createDBOptions(
+                        new DBOptions()
+                                .setUseFsync(false)
+                                .setStatsDumpPeriodSec(0)
+                                .setCreateIfMissing(true),
+                        conf);
+        this.columnFamilyOptions =
+                RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf);
+
+        try {
+            this.db = RocksDB.open(new Options(dbOptions, 
columnFamilyOptions), path);
+        } catch (RocksDBException e) {
+            throw new IOException("Error while opening RocksDB instance.", e);
+        }
+    }
+
+    public RocksDBValueState valueState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBValueState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
+    }
+
+    public RocksDBSetState setState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBSetState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
+    }
+
+    private ColumnFamilyHandle createColumnFamily(String name) throws 
IOException {
+        try {
+            return db.createColumnFamily(
+                    new ColumnFamilyDescriptor(
+                            name.getBytes(StandardCharsets.UTF_8), 
columnFamilyOptions));
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (db != null) {
+            db.close();
+            db = null;
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java
new file mode 100644
index 00000000..7c601c51
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Rocksdb state for key -> a single value. */
+public class RocksDBValueState extends RocksDBState<RocksDBState.Reference> {
+
+    public RocksDBValueState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    @Nullable
+    public RowData get(RowData key) throws IOException {
+        try {
+            Reference valueRef = get(wrap(serializeKey(key)));
+            return valueRef.isPresent() ? deserializeValue(valueRef.bytes) : 
null;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Reference get(ByteArray keyBytes) throws RocksDBException {
+        Reference valueRef = cache.getIfPresent(keyBytes);
+        if (valueRef == null) {
+            valueRef = ref(db.get(columnFamily, keyBytes.bytes));
+            cache.put(keyBytes, valueRef);
+        }
+
+        return valueRef;
+    }
+
+    public void put(RowData key, RowData value) throws IOException {
+        checkArgument(value != null);
+
+        try {
+            byte[] keyBytes = serializeKey(key);
+            byte[] valueBytes = serializeValue(value);
+            db.put(columnFamily, writeOptions, keyBytes, valueBytes);
+            cache.put(wrap(keyBytes), ref(valueBytes));
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void delete(RowData key) throws IOException {
+        try {
+            byte[] keyBytes = serializeKey(key);
+            ByteArray keyByteArray = wrap(keyBytes);
+            if (get(keyByteArray).isPresent()) {
+                db.delete(columnFamily, writeOptions, keyBytes);
+                cache.put(keyByteArray, ref(null));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private RowData deserializeValue(byte[] valueBytes) throws IOException {
+        valueInputView.setBuffer(valueBytes);
+        return valueSerializer.deserialize(valueInputView);
+    }
+
+    private byte[] serializeValue(RowData value) throws IOException {
+        valueOutputView.clear();
+        valueSerializer.serialize(value, valueOutputView);
+        return valueOutputView.getCopyOfBuffer();
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java
new file mode 100644
index 00000000..f2b03712
--- /dev/null
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.store.utils.KeyProjectedRowData;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for primary key table which provides lookup by 
secondary key. */
+public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
+
+    private final RocksDBSetState indexState;
+
+    private final KeyProjectedRowData secKeyRow;
+
+    public SecondaryIndexLookupTable(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            List<String> secKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 
2);
+        List<String> fieldNames = rowType.getFieldNames();
+        int[] secKeyMapping = 
secKey.stream().mapToInt(fieldNames::indexOf).toArray();
+        this.secKeyRow = new KeyProjectedRowData(secKeyMapping);
+        this.indexState =
+                stateFactory.setState(
+                        "sec-index",
+                        InternalSerializers.create(TypeUtils.project(rowType, 
secKeyMapping)),
+                        InternalSerializers.create(TypeUtils.project(rowType, 
primaryKeyMapping)),
+                        lruCacheSize / 2);
+    }
+
+    @Override
+    public List<RowData> get(RowData key) throws IOException {
+        List<RowData> pks = indexState.get(key);
+        List<RowData> values = new ArrayList<>(pks.size());
+        for (RowData pk : pks) {
+            RowData value = tableState.get(pk);
+            if (value != null) {
+                values.add(value);
+            }
+        }
+        return values;
+    }
+
+    @Override
+    public void refresh(Iterator<RowData> incremental) throws IOException {
+        while (incremental.hasNext()) {
+            RowData row = incremental.next();
+            primaryKey.replaceRow(row);
+            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == 
RowKind.UPDATE_AFTER) {
+                RowData previous = tableState.get(primaryKey);
+                if (previous != null) {
+                    indexState.retract(secKeyRow.replaceRow(previous), 
primaryKey);
+                }
+
+                if (recordFilter.test(row)) {
+                    tableState.put(primaryKey, row);
+                    indexState.add(secKeyRow.replaceRow(row), primaryKey);
+                } else {
+                    tableState.delete(primaryKey);
+                }
+            } else {
+                tableState.delete(primaryKey);
+                indexState.retract(secKeyRow.replaceRow(row), primaryKey);
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 8a8762c7..f6fb9ac4 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -21,8 +21,9 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.source.SnapshotEnumerator;
+import 
org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
 import org.apache.flink.table.store.table.source.TableScan;
 
 import org.slf4j.Logger;
@@ -40,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
 import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -54,10 +54,6 @@ public class ContinuousFileSplitEnumerator
 
     private final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
-    private final TableScan scan;
-
-    private final SnapshotManager snapshotManager;
-
     private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
 
     private final long discoveryInterval;
@@ -72,22 +68,20 @@ public class ContinuousFileSplitEnumerator
 
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Path location,
             TableScan scan,
-            SnapshotManager snapshotManager,
             Collection<FileStoreSourceSplit> remainSplits,
             long currentSnapshotId,
             long discoveryInterval) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
-        this.scan = checkNotNull(scan);
-        this.snapshotManager = snapshotManager;
         this.bucketSplits = new HashMap<>();
         addSplits(remainSplits);
         this.currentSnapshotId = currentSnapshotId;
         this.discoveryInterval = discoveryInterval;
         this.readersAwaitingSplit = new HashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
-        this.snapshotEnumerator = new SnapshotEnumerator(currentSnapshotId);
+        this.snapshotEnumerator = new SnapshotEnumerator(location, scan, 
currentSnapshotId);
     }
 
     private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -159,7 +153,7 @@ public class ContinuousFileSplitEnumerator
         }
 
         currentSnapshotId = result.snapshotId;
-        addSplits(result.splits);
+        addSplits(splitGenerator.createSplits(result.plan));
         assignSplits();
     }
 
@@ -182,62 +176,4 @@ public class ContinuousFileSplitEnumerator
                     }
                 });
     }
-
-    private class SnapshotEnumerator implements Callable<EnumeratorResult> {
-
-        private long nextSnapshotId;
-
-        private SnapshotEnumerator(long currentSnapshot) {
-            this.nextSnapshotId = currentSnapshot + 1;
-        }
-
-        @Nullable
-        @Override
-        public EnumeratorResult call() {
-            // TODO sync with processDiscoveredSplits to avoid too more splits 
in memory
-            while (true) {
-                if (!snapshotManager.snapshotExists(nextSnapshotId)) {
-                    // TODO check latest snapshot id, expired?
-                    LOG.debug(
-                            "Next snapshot id {} not exists, wait for it to be 
generated.",
-                            nextSnapshotId);
-                    return null;
-                }
-
-                Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-                if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
-                    if (snapshot.commitKind() == 
Snapshot.CommitKind.OVERWRITE) {
-                        LOG.warn("Ignore overwrite snapshot id {}.", 
nextSnapshotId);
-                    }
-
-                    nextSnapshotId++;
-                    LOG.debug(
-                            "Next snapshot id {} is not append, but is {}, 
check next one.",
-                            nextSnapshotId,
-                            snapshot.commitKind());
-                    continue;
-                }
-
-                List<FileStoreSourceSplit> splits =
-                        
splitGenerator.createSplits(scan.withSnapshot(nextSnapshotId).plan());
-                EnumeratorResult result = new EnumeratorResult(nextSnapshotId, 
splits);
-                LOG.debug("Find snapshot id {}, it has {} splits.", 
nextSnapshotId, splits.size());
-
-                nextSnapshotId++;
-                return result;
-            }
-        }
-    }
-
-    private static class EnumeratorResult {
-
-        private final long snapshotId;
-
-        private final List<FileStoreSourceSplit> splits;
-
-        private EnumeratorResult(long snapshotId, List<FileStoreSourceSplit> 
splits) {
-            this.snapshotId = snapshotId;
-            this.splits = splits;
-        }
-    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index b54cbde1..71742cb0 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -139,8 +139,8 @@ public class FileStoreSource
             long currentSnapshot = snapshotId == null ? 
Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
             return new ContinuousFileSplitEnumerator(
                     context,
+                    table.location(),
                     scan.withIncremental(true), // the subsequent planning is 
all incremental
-                    snapshotManager,
                     splits,
                     currentSnapshot,
                     discoveryInterval);
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 3404308a..68a976b0 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -36,6 +38,7 @@ import 
org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
+import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
@@ -45,12 +48,14 @@ import 
org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
@@ -65,6 +70,7 @@ import static 
org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE
  */
 public class TableStoreSource
         implements ScanTableSource,
+                LookupTableSource,
                 SupportsFilterPushDown,
                 SupportsProjectionPushDown,
                 SupportsLimitPushDown,
@@ -229,4 +235,19 @@ public class TableStoreSource
     public void applyLimit(long limit) {
         this.limit = limit;
     }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+        if (limit != null) {
+            throw new RuntimeException(
+                    "Limit push down should not happen in Lookup source, but 
it is " + limit);
+        }
+        int[] projection =
+                projectFields == null
+                        ? IntStream.range(0, 
table.schema().fields().size()).toArray()
+                        : Projection.of(projectFields).toTopLevelIndexes();
+        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
+        return TableFunctionProvider.of(
+                new FileStoreLookupFunction(table, projection, joinKey, 
predicate));
+    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..b5cf0013
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends AbstractTestBase {
+
+    private TableEnvironment env;
+
+    @Before
+    public void before() throws Exception {
+        env = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        env.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        env.executeSql(
+                String.format(
+                        "CREATE CATALOG my_catalog WITH ('type'='table-store', 
'warehouse'='%s')",
+                        path));
+        executeSql("USE CATALOG my_catalog");
+        executeSql("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
+        executeSql(
+                "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms')");
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 44, 444, 4444),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupProjection() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterPk() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.i > 2";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, null, null), 
Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, null, null),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterSelect() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, 
null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterUnSelect() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, 
null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterUnSelectAndUpdate() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, null, null),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookup() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.j";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, 22, 333, 3333),
+                        Row.of(null, 33, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(null, 22, null, null),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(2, 44, 444, 4444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupProjection() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, null), Row.of(33, 333), 
Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterPk() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.i > 2";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(11, null), Row.of(22, 333), 
Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), 
Row.of(44, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterSelect() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k1 > 111";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), 
Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterUnSelect() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), 
Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, null), Row.of(33, 333), 
Row.of(44, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testRepeatRefresh() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
+        executeSql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    private void executeSql(String sql) throws ExecutionException, 
InterruptedException {
+        env.executeSql(sql).await();
+    }
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java
new file mode 100644
index 00000000..b9569a88
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable}. */
+public class LookupTableTest {
+
+    @TempDir Path tempDir;
+
+    private RocksDBStateFactory stateFactory;
+
+    private RowType rowType;
+
+    @BeforeEach
+    public void before() throws IOException {
+        this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new 
Configuration());
+        this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+        }
+    }
+
+    @Test
+    public void testPkTable() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f0"),
+                        r -> r.getInt(0) < 3,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 
111)).iterator());
+        assertThat(table.get(row(1))).hasSize(0);
+
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        assertThat(table.get(row(3))).hasSize(0);
+    }
+
+    @Test
+    public void testPkTableFilter() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f0"),
+                        r -> r.getInt(1) < 22,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        result = table.get(row(1));
+        assertThat(result).hasSize(0);
+    }
+
+    @Test
+    public void testSecKeyTable() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f1"),
+                        r -> r.getInt(0) < 3,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(11));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        assertThat(table.get(row(11))).hasSize(0);
+        result = table.get(row(22));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(2, 22, 222)).iterator());
+        result = table.get(row(22));
+        assertThat(result).hasSize(2);
+        assertRow(result.get(0), 1, 22, 222);
+        assertRow(result.get(1), 2, 22, 222);
+
+        table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 
222)).iterator());
+        result = table.get(row(22));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        assertThat(table.get(row(33))).hasSize(0);
+    }
+
+    private static RowData row(Object... values) {
+        return row(RowKind.INSERT, values);
+    }
+
+    private static RowData row(RowKind kind, Object... values) {
+        GenericRowData row = new GenericRowData(kind, values.length);
+
+        for (int i = 0; i < values.length; ++i) {
+            row.setField(i, values[i]);
+        }
+
+        return row;
+    }
+
+    private static void assertRow(RowData resultRow, int... expected) {
+        int[] results = new int[expected.length];
+        for (int i = 0; i < results.length; i++) {
+            results[i] = resultRow.getInt(i);
+        }
+        assertThat(results).containsExactly(expected);
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
new file mode 100644
index 00000000..83eae895
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Callable;
+
+/** Enumerator to enumerate incremental snapshots. */
+public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.EnumeratorResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+
+    private final TableScan scan;
+
+    private long nextSnapshotId;
+
+    public SnapshotEnumerator(Path tablePath, TableScan scan, long 
currentSnapshot) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.nextSnapshotId = currentSnapshot + 1;
+    }
+
+    @Nullable
+    @Override
+    public EnumeratorResult call() {
+        // TODO sync with processDiscoveredSplits to avoid too more splits in 
memory
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                // TODO check latest snapshot id, expired?
+                LOG.debug(
+                        "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+            if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+                    LOG.warn("Ignore overwrite snapshot id {}.", 
nextSnapshotId);
+                }
+
+                nextSnapshotId++;
+                LOG.debug(
+                        "Next snapshot id {} is not APPEND, but is {}, check 
next one.",
+                        nextSnapshotId,
+                        snapshot.commitKind());
+                continue;
+            }
+
+            TableScan.Plan plan = scan.withSnapshot(nextSnapshotId).plan();
+            EnumeratorResult result = new EnumeratorResult(nextSnapshotId, 
plan);
+            LOG.debug("Find snapshot id {}.", nextSnapshotId);
+
+            nextSnapshotId++;
+            return result;
+        }
+    }
+
+    /** Enumerator result. */
+    public static class EnumeratorResult {
+
+        public final long snapshotId;
+
+        public final TableScan.Plan plan;
+
+        private EnumeratorResult(long snapshotId, TableScan.Plan plan) {
+            this.snapshotId = snapshotId;
+            this.plan = plan;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
new file mode 100644
index 00000000..58a420e2
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.utils.TypeUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.IntUnaryOperator;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+
+/** A streaming reader to read table. */
+public class TableStreamingReader {
+
+    private final FileStoreTable table;
+    private final int[] projection;
+    @Nullable private final Predicate predicate;
+    @Nullable private final PredicateFilter recordFilter;
+
+    private SnapshotEnumerator enumerator;
+
+    public TableStreamingReader(
+            FileStoreTable table, int[] projection, @Nullable Predicate 
predicate) {
+        this.table = table;
+        this.projection = projection;
+        this.predicate = predicate;
+        if (predicate != null) {
+            List<String> fieldNames = table.schema().fieldNames();
+            List<String> primaryKeys = table.schema().primaryKeys();
+
+            // for pk table: only filter by pk, the stream is upsert instead 
of changelog
+            // for non-pk table: filter all
+            IntUnaryOperator operator =
+                    i -> {
+                        int index = Ints.indexOf(projection, i);
+                        boolean safeFilter =
+                                primaryKeys.isEmpty() || 
primaryKeys.contains(fieldNames.get(i));
+                        return safeFilter ? index : -1;
+                    };
+
+            int[] fieldIdxToProjectionIdx =
+                    IntStream.range(0, 
table.schema().fields().size()).map(operator).toArray();
+
+            this.recordFilter =
+                    new PredicateFilter(
+                            TypeUtils.project(table.schema().logicalRowType(), 
projection),
+                            transformFieldMapping(predicate, 
fieldIdxToProjectionIdx).orElse(null));
+        } else {
+            recordFilter = null;
+        }
+    }
+
+    @Nullable
+    public Iterator<RowData> nextBatch() throws IOException {
+        if (enumerator == null) {
+            TableScan scan = table.newScan();
+            if (predicate != null) {
+                scan.withFilter(predicate);
+            }
+            TableScan.Plan plan = scan.plan();
+            long snapshotId = Objects.requireNonNull(plan.snapshotId);
+            enumerator =
+                    new SnapshotEnumerator(
+                            table.location(), scan.withIncremental(true), 
snapshotId);
+            return read(plan);
+        } else {
+            SnapshotEnumerator.EnumeratorResult result = enumerator.call();
+            if (result == null) {
+                return null;
+            }
+            return read(result.plan);
+        }
+    }
+
+    private Iterator<RowData> read(TableScan.Plan plan) throws IOException {
+        TableRead read = table.newRead().withProjection(projection);
+        if (predicate != null) {
+            read.withFilter(predicate);
+        }
+
+        List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new 
ArrayList<>();
+        for (Split split : plan.splits) {
+            readers.add(() -> read.createReader(split));
+        }
+        Iterator<RowData> iterator = new 
RecordReaderIterator<>(ConcatRecordReader.create(readers));
+        if (recordFilter != null) {
+            return Iterators.filter(iterator, recordFilter::test);
+        }
+        return iterator;
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index 137232ad..90b1bd4f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -55,7 +55,11 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class PredicateConverterTest {
 
     private static final PredicateBuilder BUILDER =
-            new PredicateBuilder(RowType.of(new BigIntType(), new 
DoubleType()));
+            new PredicateBuilder(
+                    new RowType(
+                            Arrays.asList(
+                                    new RowType.RowField("long1", new 
BigIntType()),
+                                    new RowType.RowField("double1", new 
DoubleType()))));
 
     private static final PredicateConverter CONVERTER = new 
PredicateConverter(BUILDER);
 
@@ -73,7 +77,8 @@ public class PredicateConverterTest {
 
     public static Stream<Arguments> provideResolvedExpression() {
         FieldReferenceExpression longRefExpr =
-                new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 
0);
+                new FieldReferenceExpression(
+                        "long1", DataTypes.BIGINT(), Integer.MAX_VALUE, 
Integer.MAX_VALUE);
         ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
         ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20);
         long longLit = 10L;
@@ -81,7 +86,8 @@ public class PredicateConverterTest {
                 new ValueLiteralExpression(null, DataTypes.BIGINT());
 
         FieldReferenceExpression doubleRefExpr =
-                new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0, 
1);
+                new FieldReferenceExpression(
+                        "double1", DataTypes.DOUBLE(), Integer.MAX_VALUE, 
Integer.MAX_VALUE);
         ValueLiteralExpression floatLitExpr = new 
ValueLiteralExpression(3.14f);
         double doubleLit = 3.14d;
 
@@ -736,7 +742,7 @@ public class PredicateConverterTest {
     }
 
     private static FieldReferenceExpression field(int i, DataType type) {
-        return new FieldReferenceExpression("name", type, 0, i);
+        return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
     }
 
     private static CallExpression call(FunctionDefinition function, 
ResolvedExpression... args) {

Reply via email to