This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff13dc82 [flink] Lookup join supports time periods black list (#4413)
3ff13dc82 is described below

commit 3ff13dc821e26320ee27d4952db953aca44a5c8e
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 5 18:09:12 2024 +0800

    [flink] Lookup join supports time periods black list (#4413)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |   9 ++
 .../flink/lookup/FileStoreLookupFunction.java      | 172 +++++++++++++++------
 .../flink/lookup/FileStoreLookupFunctionTest.java  |  95 ++++++++++--
 4 files changed, 226 insertions(+), 56 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 2e6fb1f38..9da703f44 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,6 +86,12 @@ under the License.
             <td>Integer</td>
             <td>If the pending snapshot count exceeds the threshold, lookup 
operator will refresh the table in sync.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.refresh.time-periods-blacklist</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The blacklist contains several time periods. During these time 
periods, the lookup table's cache refreshing is forbidden. Blacklist format is 
start1-&gt;end1,start2-&gt;end2,... , and the time format is yyyy-MM-dd HH:mm. 
Only used when lookup table is FULL cache mode.</td>
+        </tr>
         <tr>
             <td><h5>partition.idle-time-to-done</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e7bc6d23d..5716cfca1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -311,6 +311,15 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "If the pending snapshot count exceeds the 
threshold, lookup operator will refresh the table in sync.");
 
+    public static final ConfigOption<String> 
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST =
+            ConfigOptions.key("lookup.refresh.time-periods-blacklist")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The blacklist contains several time periods. 
During these time periods, the lookup table's "
+                                    + "cache refreshing is forbidden. 
Blacklist format is start1->end1,start2->end2,... , "
+                                    + "and the time format is yyyy-MM-dd 
HH:mm. Only used when lookup table is FULL cache mode.");
+
     public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
             ConfigOptions.key("sink.savepoint.auto-tag")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 347370c68..90e447690 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -32,9 +32,12 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -53,6 +56,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -62,6 +66,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -69,6 +74,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
 import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
 import static 
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
@@ -88,12 +94,16 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     private final List<String> joinKeys;
     @Nullable private final Predicate predicate;
 
-    private transient Duration refreshInterval;
+    private final List<Pair<Long, Long>> timePeriodsBlacklist;
+    private long nextBlacklistCheckTime;
+
     private transient File path;
     private transient LookupTable lookupTable;
 
-    // timestamp when cache expires
-    private transient long nextLoadTime;
+    // interval of refreshing lookup table
+    private transient Duration refreshInterval;
+    // timestamp when refreshing lookup table
+    private transient long nextRefreshTime;
 
     protected FunctionContext functionContext;
 
@@ -131,6 +141,45 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
 
         this.predicate = predicate;
+
+        this.timePeriodsBlacklist =
+                parseTimePeriodsBlacklist(
+                        
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
+        this.nextBlacklistCheckTime = -1;
+    }
+
+    private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) 
{
+        if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
+            return Collections.emptyList();
+        }
+        String[] timePeriods = blacklist.split(",");
+        List<Pair<Long, Long>> result = new ArrayList<>();
+        for (String period : timePeriods) {
+            String[] times = period.split("->");
+            if (times.length != 2) {
+                throw new IllegalArgumentException(
+                        String.format("Incorrect time periods format: [%s].", 
blacklist));
+            }
+
+            long left = parseToMillis(times[0]);
+            long right = parseToMillis(times[1]);
+            if (left > right) {
+                throw new IllegalArgumentException(
+                        String.format("Incorrect time period: [%s->%s].", 
times[0], times[1]));
+            }
+            result.add(Pair.of(left, right));
+        }
+        return result;
+    }
+
+    private long parseToMillis(String dateTime) {
+        try {
+            return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, 
TimeZone.getDefault())
+                    .getMillisecond();
+        } catch (DateTimeParseException e) {
+            throw new IllegalArgumentException(
+                    String.format("Date time format error: [%s].", dateTime), 
e);
+        }
     }
 
     public void open(FunctionContext context) throws Exception {
@@ -149,11 +198,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     }
 
     private void open() throws Exception {
-        if (partitionLoader != null) {
-            partitionLoader.open();
-        }
-
-        this.nextLoadTime = -1;
+        this.nextRefreshTime = -1;
 
         Options options = Options.fromMap(table.options());
         this.refreshInterval =
@@ -197,7 +242,15 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             this.lookupTable = FullCacheLookupTable.create(context, 
options.get(LOOKUP_CACHE_ROWS));
         }
 
-        refreshDynamicPartition(false);
+        if (partitionLoader != null) {
+            partitionLoader.open();
+            partitionLoader.checkRefresh();
+            BinaryRow partition = partitionLoader.partition();
+            if (partition != null) {
+                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+            }
+        }
+
         if (cacheRowFilter != null) {
             lookupTable.specifyCacheRowFilter(cacheRowFilter);
         }
@@ -222,15 +275,14 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     public Collection<RowData> lookup(RowData keyRow) {
         try {
-            checkRefresh();
+            tryRefresh();
 
             InternalRow key = new FlinkRowWrapper(keyRow);
             if (partitionLoader != null) {
-                InternalRow partition = refreshDynamicPartition(true);
-                if (partition == null) {
+                if (partitionLoader.partition() == null) {
                     return Collections.emptyList();
                 }
-                key = JoinedRow.join(key, partition);
+                key = JoinedRow.join(key, partitionLoader.partition());
             }
 
             List<InternalRow> results = lookupTable.get(key);
@@ -247,28 +299,6 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
     }
 
-    @Nullable
-    private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception 
{
-        if (partitionLoader == null) {
-            return null;
-        }
-
-        boolean partitionChanged = partitionLoader.checkRefresh();
-        BinaryRow partition = partitionLoader.partition();
-        if (partition == null) {
-            return null;
-        }
-
-        
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
-
-        if (partitionChanged && reopen) {
-            lookupTable.close();
-            lookupTable.open();
-        }
-
-        return partition;
-    }
-
     private Predicate createSpecificPartFilter(BinaryRow partition) {
         RowType rowType = table.rowType();
         List<String> partitionKeys = table.partitionKeys();
@@ -293,20 +323,73 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
     }
 
-    private void checkRefresh() throws Exception {
-        if (nextLoadTime > System.currentTimeMillis()) {
+    @VisibleForTesting
+    void tryRefresh() throws Exception {
+        // 1. check if this time is in black list
+        long currentTimeMillis = System.currentTimeMillis();
+        if (nextBlacklistCheckTime > currentTimeMillis) {
             return;
         }
-        if (nextLoadTime > 0) {
+
+        Pair<Long, Long> period = getFirstTimePeriods(timePeriodsBlacklist, 
currentTimeMillis);
+        if (period != null) {
+            LOG.info(
+                    "Current time {} is in black list {}-{}, so try to refresh 
cache next time.",
+                    currentTimeMillis,
+                    period.getLeft(),
+                    period.getRight());
+            nextBlacklistCheckTime = period.getRight() + 1;
+            return;
+        }
+
+        // 2. refresh dynamic partition
+        if (partitionLoader != null) {
+            boolean partitionChanged = partitionLoader.checkRefresh();
+            BinaryRow partition = partitionLoader.partition();
+            if (partition == null) {
+                // no data to be load, fast exit
+                return;
+            }
+
+            if (partitionChanged) {
+                // reopen with latest partition
+                
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+                lookupTable.close();
+                lookupTable.open();
+                // no need to refresh the lookup table because it is reopened
+                return;
+            }
+        }
+
+        // 3. refresh lookup table
+        if (shouldRefreshLookupTable()) {
+            lookupTable.refresh();
+            nextRefreshTime = System.currentTimeMillis() + 
refreshInterval.toMillis();
+        }
+    }
+
+    @Nullable
+    private Pair<Long, Long> getFirstTimePeriods(List<Pair<Long, Long>> 
timePeriods, long time) {
+        for (Pair<Long, Long> period : timePeriods) {
+            if (period.getLeft() <= time && time <= period.getRight()) {
+                return period;
+            }
+        }
+        return null;
+    }
+
+    private boolean shouldRefreshLookupTable() {
+        if (nextRefreshTime > System.currentTimeMillis()) {
+            return false;
+        }
+
+        if (nextRefreshTime > 0) {
             LOG.info(
                     "Lookup table {} has refreshed after {} second(s), 
refreshing",
                     table.name(),
                     refreshInterval.toMillis() / 1000);
         }
-
-        refresh();
-
-        nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
+        return true;
     }
 
     @VisibleForTesting
@@ -314,8 +397,9 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         return lookupTable;
     }
 
-    private void refresh() throws Exception {
-        lookupTable.refresh();
+    @VisibleForTesting
+    long nextBlacklistCheckTime() {
+        return nextBlacklistCheckTime;
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index f8c8adcb2..2b219f6a7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -52,6 +53,9 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -59,8 +63,11 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 /** Tests for {@link FileStoreLookupFunction}. */
 public class FileStoreLookupFunctionTest {
@@ -91,6 +98,18 @@ public class FileStoreLookupFunctionTest {
             boolean dynamicPartition,
             boolean refreshAsync)
             throws Exception {
+        table = createFileStoreTable(isPartition, dynamicPartition, 
refreshAsync);
+        lookupFunction = createLookupFunction(table, joinEqualPk);
+        lookupFunction.open(tempDir.toString());
+    }
+
+    private FileStoreLookupFunction createLookupFunction(Table table, boolean 
joinEqualPk) {
+        return new FileStoreLookupFunction(
+                table, new int[] {0, 1}, joinEqualPk ? new int[] {0, 1} : new 
int[] {1}, null);
+    }
+
+    private FileStoreTable createFileStoreTable(
+            boolean isPartition, boolean dynamicPartition, boolean 
refreshAsync) throws Exception {
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
         Options conf = new Options();
         conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
@@ -106,7 +125,6 @@ public class FileStoreLookupFunctionTest {
                 RowType.of(
                         new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
                         new String[] {"pt", "k", "v"});
-
         Schema schema =
                 new Schema(
                         rowType.getFields(),
@@ -115,17 +133,8 @@ public class FileStoreLookupFunctionTest {
                         conf.toMap(),
                         "");
         TableSchema tableSchema = schemaManager.createTable(schema);
-        table =
-                FileStoreTableFactory.create(
-                        fileIO, new 
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
-
-        lookupFunction =
-                new FileStoreLookupFunction(
-                        table,
-                        new int[] {0, 1},
-                        joinEqualPk ? new int[] {0, 1} : new int[] {1},
-                        null);
-        lookupFunction.open(tempDir.toString());
+        return FileStoreTableFactory.create(
+                fileIO, new org.apache.paimon.fs.Path(tempDir.toString()), 
tableSchema);
     }
 
     @AfterEach
@@ -214,6 +223,68 @@ public class FileStoreLookupFunctionTest {
                 .isEqualTo(0);
     }
 
+    @Test
+    public void testParseWrongTimePeriodsBlacklist() throws Exception {
+        Table table = createFileStoreTable(false, false, false);
+
+        Table table1 =
+                table.copy(
+                        Collections.singletonMap(
+                                LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+                                "2024-10-31 12:00,2024-10-31 16:00"));
+        assertThatThrownBy(() -> createLookupFunction(table1, true))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Incorrect time periods format: [2024-10-31 
12:00,2024-10-31 16:00]."));
+
+        Table table2 =
+                table.copy(
+                        Collections.singletonMap(
+                                LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+                                "20241031 12:00->20241031 16:00"));
+        assertThatThrownBy(() -> createLookupFunction(table2, true))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Date time format error: [20241031 12:00]"));
+
+        Table table3 =
+                table.copy(
+                        Collections.singletonMap(
+                                LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+                                "2024-10-31 12:00->2024-10-31 16:00,2024-10-31 
20:00->2024-10-31 18:00"));
+        assertThatThrownBy(() -> createLookupFunction(table3, true))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "Incorrect time period: [2024-10-31 
20:00->2024-10-31 18:00]"));
+    }
+
+    @Test
+    public void testCheckRefreshInBlacklist() throws Exception {
+        Instant now = Instant.now();
+        Instant start = Instant.ofEpochSecond(now.getEpochSecond() / 60 * 60);
+        Instant end = start.plusSeconds(30 * 60);
+
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm");
+        String left = start.atZone(ZoneId.systemDefault()).format(formatter);
+        String right = end.atZone(ZoneId.systemDefault()).format(formatter);
+
+        Table table =
+                createFileStoreTable(false, false, false)
+                        .copy(
+                                Collections.singletonMap(
+                                        
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+                                        left + "->" + right));
+
+        FileStoreLookupFunction lookupFunction = createLookupFunction(table, 
true);
+
+        lookupFunction.tryRefresh();
+
+        
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli()
 + 1);
+    }
+
     private void commit(List<CommitMessage> messages) throws Exception {
         TableCommitImpl commit = table.newCommit(commitUser);
         commit.commit(messages);

Reply via email to