SteNicholas commented on code in PR #281:
URL: https://github.com/apache/flink-table-store/pull/281#discussion_r963226417


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.TableStreamingReader;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.store.connector.RocksDBOptions.LRU_CACHE_MAX_ROWS;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A lookup {@link TableFunction} for file store. */
+public class FileStoreLookupFunction extends TableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreLookupFunction.class);
+
+    private final FileStoreTable table;
+    private final List<String> projectFields;
+    private final List<String> joinKeys;
+    @Nullable private final Predicate predicate;
+
+    private transient Duration refreshInterval;
+    private transient File path;
+    private transient RocksDBStateFactory stateFactory;
+    private transient LookupTable lookupTable;
+
+    // timestamp when cache expires
+    private transient long nextLoadTime;
+    private transient TableStreamingReader streamingReader;
+
+    public FileStoreLookupFunction(
+            FileStoreTable table,
+            int[] projection,
+            int[] joinKeyIndex,
+            @Nullable Predicate predicate) {
+        TableSchema schema = table.schema();
+        checkArgument(
+                schema.partitionKeys().isEmpty(), "Currently only support 
non-partitioned table.");
+        checkArgument(schema.primaryKeys().size() > 0, "Currently only support 
primary key table.");
+        this.table = table;
+
+        // join keys are based on projection fields
+        this.joinKeys =
+                Arrays.stream(joinKeyIndex)
+                        .mapToObj(i -> schema.fieldNames().get(projection[i]))
+                        .collect(Collectors.toList());
+
+        this.projectFields =
+                Arrays.stream(projection)
+                        .mapToObj(i -> schema.fieldNames().get(i))
+                        .collect(Collectors.toList());
+
+        // add primary keys
+        for (String field : schema.primaryKeys()) {
+            if (!projectFields.contains(field)) {
+                projectFields.add(field);
+            }
+        }
+
+        this.predicate = predicate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        String tmpDirectory = getTmpDirectory(context);
+        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
+
+        Configuration options = 
Configuration.fromMap(table.schema().options());
+        this.refreshInterval = 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
+        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+
+        List<String> fieldNames = 
table.schema().logicalRowType().getFieldNames();
+        int[] projection = 
projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
+        RowType rowType = TypeUtils.project(table.schema().logicalRowType(), 
projection);
+
+        PredicateFilter recordFilter = createRecordFilter(projection);
+        this.lookupTable =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        table.schema().primaryKeys(),
+                        joinKeys,
+                        recordFilter,
+                        options.getLong(LRU_CACHE_MAX_ROWS));
+        this.nextLoadTime = -1;
+        this.streamingReader = new TableStreamingReader(table, projection, 
this.predicate);
+
+        // do first load
+        refresh();
+    }
+
+    private PredicateFilter createRecordFilter(int[] projection) {
+        Predicate adjustedPredicate = null;
+        if (predicate != null) {
+            // adjust to projection index
+            adjustedPredicate =
+                    transformFieldMapping(
+                                    this.predicate,
+                                    IntStream.range(0, 
table.schema().fields().size())
+                                            .map(i -> Ints.indexOf(projection, 
i))
+                                            .toArray())
+                            .orElse(null);
+        }
+        return new PredicateFilter(
+                TypeUtils.project(table.schema().logicalRowType(), 
projection), adjustedPredicate);
+    }
+
+    /** Used by code generation. */
+    @SuppressWarnings("unused")
+    public void eval(Object... values) throws IOException {
+        checkRefresh();
+        List<RowData> results = lookupTable.get(GenericRowData.of(values));
+        for (RowData matchedRow : results) {
+            collect(matchedRow);
+        }
+    }
+
+    private void checkRefresh() throws IOException {
+        if (nextLoadTime > System.currentTimeMillis()) {
+            return;
+        }
+        if (nextLoadTime > 0) {
+            LOG.info(
+                    "Lookup table has refreshed after {} minute(s), 
refreshing",
+                    refreshInterval.toMinutes());
+        }
+
+        refresh();
+
+        nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
+    }
+
+    private void refresh() throws IOException {
+        Iterator<RowData> batch = streamingReader.nextBatch();
+        if (batch != null) {
+            this.lookupTable.refresh(batch);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();

Review Comment:
   Adds `stateFactory = null` after this line.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.RocksDBOptions;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** Factory to create state. */
+public class RocksDBStateFactory implements Closeable {
+
+    private final RocksDB db;
+
+    private final ColumnFamilyOptions columnFamilyOptions;
+
+    public RocksDBStateFactory(String path, Configuration conf) throws 
IOException {
+        DBOptions dbOptions =
+                RocksDBOptions.createDBOptions(
+                        new DBOptions()
+                                .setUseFsync(false)
+                                .setStatsDumpPeriodSec(0)
+                                .setCreateIfMissing(true),
+                        conf);
+        this.columnFamilyOptions =
+                RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf);
+
+        try {
+            this.db = RocksDB.open(new Options(dbOptions, 
columnFamilyOptions), path);
+        } catch (RocksDBException e) {
+            throw new IOException("Error while opening RocksDB instance.", e);
+        }
+    }
+
+    public RocksDBValueState valueState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBValueState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
+    }
+
+    public RocksDBSetState setState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBSetState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, 
lruCacheSize);
+    }
+
+    private ColumnFamilyHandle createColumnFamily(String name) throws 
IOException {
+        try {
+            return db.createColumnFamily(
+                    new ColumnFamilyDescriptor(
+                            name.getBytes(StandardCharsets.UTF_8), 
columnFamilyOptions));
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.db.close();

Review Comment:
   Adds `db = null;` after this line.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Callable;
+
+/** Enumerator to enumerate incremental snapshots. */
+public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.EnumeratorResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+
+    private final TableScan scan;
+
+    private long nextSnapshotId;
+
+    public SnapshotEnumerator(Path tablePath, TableScan scan, long 
currentSnapshot) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.nextSnapshotId = currentSnapshot + 1;
+    }
+
+    @Nullable
+    @Override
+    public EnumeratorResult call() {
+        // TODO sync with processDiscoveredSplits to avoid too more splits in 
memory
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                // TODO check latest snapshot id, expired?
+                LOG.debug(
+                        "Next snapshot id {} not exists, wait for it to be 
generated.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+            if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+                    LOG.warn("Ignore overwrite snapshot id {}.", 
nextSnapshotId);
+                }
+
+                nextSnapshotId++;
+                LOG.debug(
+                        "Next snapshot id {} is not append, but is {}, check 
next one.",

Review Comment:
   ```suggestion
                           "Next snapshot id {} is not APPEND, but is {}, check 
next one.",
   ```



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Rocksdb state for key -> Set values. */
+public class RocksDBSetState extends RocksDBState<List<byte[]>> {
+
+    private static final byte[] EMPTY = new byte[0];
+
+    public RocksDBSetState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    public List<RowData> get(RowData key) throws IOException {
+        ByteArray keyBytes = wrap(serializeKey(key));
+        List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
+        if (valueBytes == null) {
+            valueBytes = new ArrayList<>();
+            try (RocksIterator iterator = db.newIterator(columnFamily)) {
+                iterator.seek(keyBytes.bytes);
+
+                while (iterator.isValid() && 
startWithKeyPrefix(keyBytes.bytes, iterator.key())) {
+                    byte[] rawKeyBytes = iterator.key();
+                    byte[] value =
+                            Arrays.copyOfRange(
+                                    rawKeyBytes, keyBytes.bytes.length, 
rawKeyBytes.length);
+                    valueBytes.add(value);
+                    iterator.next();
+                }
+            }
+            cache.put(keyBytes, valueBytes);
+        }
+
+        List<RowData> values = new ArrayList<>(valueBytes.size());
+        for (byte[] value : valueBytes) {
+            valueInputView.setBuffer(value);
+            values.add(valueSerializer.deserialize(valueInputView));
+        }
+        return values;
+    }
+
+    public void retract(RowData key, RowData value) throws IOException {
+        try {
+            byte[] bytes = invalidKeyAndGetKVBytes(key, value);
+            if (db.get(columnFamily, bytes) != null) {
+                db.delete(columnFamily, writeOptions, bytes);

Review Comment:
   Should the cache put the key with null value?



##########
docs/content/docs/development/lookup-join.md:
##########
@@ -0,0 +1,79 @@
+---
+title: "Lookup Join"
+weight: 6
+type: docs
+aliases:
+- /development/lookup-join.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Lookup Join
+
+A [Lookup 
Join](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/joins/)
+is used to enrich a table with data that is queried from Flink Table Store. 
The join requires one table to have
+a processing time attribute and the other table to be backed by a lookup 
source connector.
+
+First, create a table, and update it in real-time.
+
+```sql
+-- Create a table in table-store catalog
+CREATE TABLE customers (

Review Comment:
   Add the following SQL before this line:
   ```
   CREATE CATALOG my_catalog WITH (
     'type'='table-store',
     'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
   );
   
   USE CATALOG my_catalog;
   ```



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Callable;
+
+/** Enumerator to enumerate incremental snapshots. */
+public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.EnumeratorResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+
+    private final TableScan scan;
+
+    private long nextSnapshotId;
+
+    public SnapshotEnumerator(Path tablePath, TableScan scan, long 
currentSnapshot) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.nextSnapshotId = currentSnapshot + 1;
+    }
+
+    @Nullable
+    @Override
+    public EnumeratorResult call() {
+        // TODO sync with processDiscoveredSplits to avoid too more splits in 
memory
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                // TODO check latest snapshot id, expired?
+                LOG.debug(
+                        "Next snapshot id {} not exists, wait for it to be 
generated.",

Review Comment:
   ```suggestion
                           "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to