This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c17cdd508 [flink] Extract RefreshBlacklist from FileStoreLookupFunction
c17cdd508 is described below
commit c17cdd508ae283dfe9c182f9373eac711cf74c53
Author: Jingsong <[email protected]>
AuthorDate: Tue Nov 5 18:22:41 2024 +0800
[flink] Extract RefreshBlacklist from FileStoreLookupFunction
---
.../flink/lookup/FileStoreLookupFunction.java | 74 +-------------
.../paimon/flink/lookup/RefreshBlacklist.java | 112 +++++++++++++++++++++
2 files changed, 117 insertions(+), 69 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 90e447690..1c5619133 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -32,12 +32,9 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -56,7 +53,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
-import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -66,7 +62,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -93,9 +88,7 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
-
- private final List<Pair<Long, Long>> timePeriodsBlacklist;
- private long nextBlacklistCheckTime;
+ private final RefreshBlacklist refreshBlacklist;
private transient File path;
private transient LookupTable lookupTable;
@@ -142,44 +135,9 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
this.predicate = predicate;
- this.timePeriodsBlacklist =
- parseTimePeriodsBlacklist(
+ this.refreshBlacklist =
+ new RefreshBlacklist(
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
- this.nextBlacklistCheckTime = -1;
- }
-
- private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist)
{
- if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
- return Collections.emptyList();
- }
- String[] timePeriods = blacklist.split(",");
- List<Pair<Long, Long>> result = new ArrayList<>();
- for (String period : timePeriods) {
- String[] times = period.split("->");
- if (times.length != 2) {
- throw new IllegalArgumentException(
- String.format("Incorrect time periods format: [%s].",
blacklist));
- }
-
- long left = parseToMillis(times[0]);
- long right = parseToMillis(times[1]);
- if (left > right) {
- throw new IllegalArgumentException(
- String.format("Incorrect time period: [%s->%s].",
times[0], times[1]));
- }
- result.add(Pair.of(left, right));
- }
- return result;
- }
-
- private long parseToMillis(String dateTime) {
- try {
- return DateTimeUtils.parseTimestampData(dateTime + ":00", 3,
TimeZone.getDefault())
- .getMillisecond();
- } catch (DateTimeParseException e) {
- throw new IllegalArgumentException(
- String.format("Date time format error: [%s].", dateTime),
e);
- }
}
public void open(FunctionContext context) throws Exception {
@@ -326,19 +284,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
@VisibleForTesting
void tryRefresh() throws Exception {
// 1. check if this time is in black list
- long currentTimeMillis = System.currentTimeMillis();
- if (nextBlacklistCheckTime > currentTimeMillis) {
- return;
- }
-
- Pair<Long, Long> period = getFirstTimePeriods(timePeriodsBlacklist,
currentTimeMillis);
- if (period != null) {
- LOG.info(
- "Current time {} is in black list {}-{}, so try to refresh
cache next time.",
- currentTimeMillis,
- period.getLeft(),
- period.getRight());
- nextBlacklistCheckTime = period.getRight() + 1;
+ if (!refreshBlacklist.canRefresh()) {
return;
}
@@ -368,16 +314,6 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
- @Nullable
- private Pair<Long, Long> getFirstTimePeriods(List<Pair<Long, Long>>
timePeriods, long time) {
- for (Pair<Long, Long> period : timePeriods) {
- if (period.getLeft() <= time && time <= period.getRight()) {
- return period;
- }
- }
- return null;
- }
-
private boolean shouldRefreshLookupTable() {
if (nextRefreshTime > System.currentTimeMillis()) {
return false;
@@ -399,7 +335,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
@VisibleForTesting
long nextBlacklistCheckTime() {
- return nextBlacklistCheckTime;
+ return refreshBlacklist.nextBlacklistCheckTime();
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
new file mode 100644
index 000000000..96923550b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RefreshBlacklist.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TimeZone;
+
+/** Refresh black list for {@link FileStoreLookupFunction}. */
+public class RefreshBlacklist {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RefreshBlacklist.class);
+
+ private final List<Pair<Long, Long>> timePeriodsBlacklist;
+
+ private long nextBlacklistCheckTime;
+
+ public RefreshBlacklist(String blacklist) {
+ this.timePeriodsBlacklist = parseTimePeriodsBlacklist(blacklist);
+ this.nextBlacklistCheckTime = -1;
+ }
+
+ private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist)
{
+ if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
+ return Collections.emptyList();
+ }
+ String[] timePeriods = blacklist.split(",");
+ List<Pair<Long, Long>> result = new ArrayList<>();
+ for (String period : timePeriods) {
+ String[] times = period.split("->");
+ if (times.length != 2) {
+ throw new IllegalArgumentException(
+ String.format("Incorrect time periods format: [%s].",
blacklist));
+ }
+
+ long left = parseToMillis(times[0]);
+ long right = parseToMillis(times[1]);
+ if (left > right) {
+ throw new IllegalArgumentException(
+ String.format("Incorrect time period: [%s->%s].",
times[0], times[1]));
+ }
+ result.add(Pair.of(left, right));
+ }
+ return result;
+ }
+
+ private long parseToMillis(String dateTime) {
+ try {
+ return DateTimeUtils.parseTimestampData(dateTime + ":00", 3,
TimeZone.getDefault())
+ .getMillisecond();
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException(
+ String.format("Date time format error: [%s].", dateTime),
e);
+ }
+ }
+
+ public boolean canRefresh() {
+ long currentTimeMillis = System.currentTimeMillis();
+ if (currentTimeMillis < nextBlacklistCheckTime) {
+ return false;
+ }
+
+ Pair<Long, Long> selectedPeriod = null;
+ for (Pair<Long, Long> period : timePeriodsBlacklist) {
+ if (period.getLeft() <= currentTimeMillis && currentTimeMillis <=
period.getRight()) {
+ selectedPeriod = period;
+ break;
+ }
+ }
+
+ if (selectedPeriod != null) {
+ LOG.info(
+ "Current time {} is in black list {}-{}, so try to refresh
cache next time.",
+ currentTimeMillis,
+ selectedPeriod.getLeft(),
+ selectedPeriod.getRight());
+ nextBlacklistCheckTime = selectedPeriod.getRight() + 1;
+ return false;
+ }
+
+ return true;
+ }
+
+ public long nextBlacklistCheckTime() {
+ return nextBlacklistCheckTime;
+ }
+}