This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c17cdd508 [flink] Extract RefreshBlacklist from FileStoreLookupFunction
c17cdd508 is described below

commit c17cdd508ae283dfe9c182f9373eac711cf74c53
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 5 18:22:41 2024 +0800

    [flink] Extract RefreshBlacklist from FileStoreLookupFunction
---
 .../flink/lookup/FileStoreLookupFunction.java      |  74 +-------------
 .../paimon/flink/lookup/RefreshBlacklist.java      | 112 +++++++++++++++++++++
 2 files changed, 117 insertions(+), 69 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 90e447690..1c5619133 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -32,12 +32,9 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-import org.apache.paimon.utils.StringUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -56,7 +53,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.time.Duration;
-import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -66,7 +62,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -93,9 +88,7 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
     private final List<String> projectFields;
     private final List<String> joinKeys;
     @Nullable private final Predicate predicate;
-
-    private final List<Pair<Long, Long>> timePeriodsBlacklist;
-    private long nextBlacklistCheckTime;
+    private final RefreshBlacklist refreshBlacklist;
 
     private transient File path;
     private transient LookupTable lookupTable;
@@ -142,44 +135,9 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
         this.predicate = predicate;
 
-        this.timePeriodsBlacklist =
-                parseTimePeriodsBlacklist(
+        this.refreshBlacklist =
+                new RefreshBlacklist(
                         
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
-        this.nextBlacklistCheckTime = -1;
-    }
-
-    private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) 
{
-        if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
-            return Collections.emptyList();
-        }
-        String[] timePeriods = blacklist.split(",");
-        List<Pair<Long, Long>> result = new ArrayList<>();
-        for (String period : timePeriods) {
-            String[] times = period.split("->");
-            if (times.length != 2) {
-                throw new IllegalArgumentException(
-                        String.format("Incorrect time periods format: [%s].", 
blacklist));
-            }
-
-            long left = parseToMillis(times[0]);
-            long right = parseToMillis(times[1]);
-            if (left > right) {
-                throw new IllegalArgumentException(
-                        String.format("Incorrect time period: [%s->%s].", 
times[0], times[1]));
-            }
-            result.add(Pair.of(left, right));
-        }
-        return result;
-    }
-
-    private long parseToMillis(String dateTime) {
-        try {
-            return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, 
TimeZone.getDefault())
-                    .getMillisecond();
-        } catch (DateTimeParseException e) {
-            throw new IllegalArgumentException(
-                    String.format("Date time format error: [%s].", dateTime), 
e);
-        }
     }
 
     public void open(FunctionContext context) throws Exception {
@@ -326,19 +284,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     @VisibleForTesting
     void tryRefresh() throws Exception {
         // 1. check if this time is in black list
-        long currentTimeMillis = System.currentTimeMillis();
-        if (nextBlacklistCheckTime > currentTimeMillis) {
-            return;
-        }
-
-        Pair<Long, Long> period = getFirstTimePeriods(timePeriodsBlacklist, 
currentTimeMillis);
-        if (period != null) {
-            LOG.info(
-                    "Current time {} is in black list {}-{}, so try to refresh 
cache next time.",
-                    currentTimeMillis,
-                    period.getLeft(),
-                    period.getRight());
-            nextBlacklistCheckTime = period.getRight() + 1;
+        if (!refreshBlacklist.canRefresh()) {
             return;
         }
 
@@ -368,16 +314,6 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         }
     }
 
-    @Nullable
-    private Pair<Long, Long> getFirstTimePeriods(List<Pair<Long, Long>> 
timePeriods, long time) {
-        for (Pair<Long, Long> period : timePeriods) {
-            if (period.getLeft() <= time && time <= period.getRight()) {
-                return period;
-            }
-        }
-        return null;
-    }
-
     private boolean shouldRefreshLookupTable() {
         if (nextRefreshTime > System.currentTimeMillis()) {
             return false;
@@ -399,7 +335,7 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     @VisibleForTesting
     long nextBlacklistCheckTime() {
-        return nextBlacklistCheckTime;
+        return refreshBlacklist.nextBlacklistCheckTime();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
new file mode 100644
index 000000000..96923550b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TimeZone;
+
+/** Refresh black list for {@link FileStoreLookupFunction}. */
+public class RefreshBlacklist {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RefreshBlacklist.class);
+
+    private final List<Pair<Long, Long>> timePeriodsBlacklist;
+
+    private long nextBlacklistCheckTime;
+
+    public RefreshBlacklist(String blacklist) {
+        this.timePeriodsBlacklist = parseTimePeriodsBlacklist(blacklist);
+        this.nextBlacklistCheckTime = -1;
+    }
+
+    private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) 
{
+        if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
+            return Collections.emptyList();
+        }
+        String[] timePeriods = blacklist.split(",");
+        List<Pair<Long, Long>> result = new ArrayList<>();
+        for (String period : timePeriods) {
+            String[] times = period.split("->");
+            if (times.length != 2) {
+                throw new IllegalArgumentException(
+                        String.format("Incorrect time periods format: [%s].", 
blacklist));
+            }
+
+            long left = parseToMillis(times[0]);
+            long right = parseToMillis(times[1]);
+            if (left > right) {
+                throw new IllegalArgumentException(
+                        String.format("Incorrect time period: [%s->%s].", 
times[0], times[1]));
+            }
+            result.add(Pair.of(left, right));
+        }
+        return result;
+    }
+
+    private long parseToMillis(String dateTime) {
+        try {
+            return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, 
TimeZone.getDefault())
+                    .getMillisecond();
+        } catch (DateTimeParseException e) {
+            throw new IllegalArgumentException(
+                    String.format("Date time format error: [%s].", dateTime), 
e);
+        }
+    }
+
+    public boolean canRefresh() {
+        long currentTimeMillis = System.currentTimeMillis();
+        if (currentTimeMillis < nextBlacklistCheckTime) {
+            return false;
+        }
+
+        Pair<Long, Long> selectedPeriod = null;
+        for (Pair<Long, Long> period : timePeriodsBlacklist) {
+            if (period.getLeft() <= currentTimeMillis && currentTimeMillis <= 
period.getRight()) {
+                selectedPeriod = period;
+                break;
+            }
+        }
+
+        if (selectedPeriod != null) {
+            LOG.info(
+                    "Current time {} is in black list {}-{}, so try to refresh 
cache next time.",
+                    currentTimeMillis,
+                    selectedPeriod.getLeft(),
+                    selectedPeriod.getRight());
+            nextBlacklistCheckTime = selectedPeriod.getRight() + 1;
+            return false;
+        }
+
+        return true;
+    }
+
+    public long nextBlacklistCheckTime() {
+        return nextBlacklistCheckTime;
+    }
+}

Reply via email to