This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff13dc82 [flink] Lookup join supports time periods black list (#4413)
3ff13dc82 is described below
commit 3ff13dc821e26320ee27d4952db953aca44a5c8e
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 5 18:09:12 2024 +0800
[flink] Lookup join supports time periods black list (#4413)
---
.../generated/flink_connector_configuration.html | 6 +
.../apache/paimon/flink/FlinkConnectorOptions.java | 9 ++
.../flink/lookup/FileStoreLookupFunction.java | 172 +++++++++++++++------
.../flink/lookup/FileStoreLookupFunctionTest.java | 95 ++++++++++--
4 files changed, 226 insertions(+), 56 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 2e6fb1f38..9da703f44 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,6 +86,12 @@ under the License.
<td>Integer</td>
<td>If the pending snapshot count exceeds the threshold, lookup
operator will refresh the table in sync.</td>
</tr>
+ <tr>
+ <td><h5>lookup.refresh.time-periods-blacklist</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The blacklist contains several time periods. During these time
periods, the lookup table's cache refreshing is forbidden. Blacklist format is
start1->end1,start2->end2,... , and the time format is yyyy-MM-dd HH:mm.
Only used when lookup table is FULL cache mode.</td>
+ </tr>
<tr>
<td><h5>partition.idle-time-to-done</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index e7bc6d23d..5716cfca1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -311,6 +311,15 @@ public class FlinkConnectorOptions {
.withDescription(
"If the pending snapshot count exceeds the
threshold, lookup operator will refresh the table in sync.");
+ public static final ConfigOption<String>
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST =
+ ConfigOptions.key("lookup.refresh.time-periods-blacklist")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The blacklist contains several time periods.
During these time periods, the lookup table's "
+ + "cache refreshing is forbidden.
Blacklist format is start1->end1,start2->end2,... , "
+ + "and the time format is yyyy-MM-dd
HH:mm. Only used when lookup table is FULL cache mode.");
+
public static final ConfigOption<Boolean> SINK_AUTO_TAG_FOR_SAVEPOINT =
ConfigOptions.key("sink.savepoint.auto-tag")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 347370c68..90e447690 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -32,9 +32,12 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -53,6 +56,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
+import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -62,6 +66,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -69,6 +74,7 @@ import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
@@ -88,12 +94,16 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
- private transient Duration refreshInterval;
+ private final List<Pair<Long, Long>> timePeriodsBlacklist;
+ private long nextBlacklistCheckTime;
+
private transient File path;
private transient LookupTable lookupTable;
- // timestamp when cache expires
- private transient long nextLoadTime;
+ // interval of refreshing lookup table
+ private transient Duration refreshInterval;
+ // timestamp when refreshing lookup table
+ private transient long nextRefreshTime;
protected FunctionContext functionContext;
@@ -131,6 +141,45 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
this.predicate = predicate;
+
+ this.timePeriodsBlacklist =
+ parseTimePeriodsBlacklist(
+
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
+ this.nextBlacklistCheckTime = -1;
+ }
+
+ private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist)
{
+ if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
+ return Collections.emptyList();
+ }
+ String[] timePeriods = blacklist.split(",");
+ List<Pair<Long, Long>> result = new ArrayList<>();
+ for (String period : timePeriods) {
+ String[] times = period.split("->");
+ if (times.length != 2) {
+ throw new IllegalArgumentException(
+ String.format("Incorrect time periods format: [%s].",
blacklist));
+ }
+
+ long left = parseToMillis(times[0]);
+ long right = parseToMillis(times[1]);
+ if (left > right) {
+ throw new IllegalArgumentException(
+ String.format("Incorrect time period: [%s->%s].",
times[0], times[1]));
+ }
+ result.add(Pair.of(left, right));
+ }
+ return result;
+ }
+
+ private long parseToMillis(String dateTime) {
+ try {
+ return DateTimeUtils.parseTimestampData(dateTime + ":00", 3,
TimeZone.getDefault())
+ .getMillisecond();
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException(
+ String.format("Date time format error: [%s].", dateTime),
e);
+ }
}
public void open(FunctionContext context) throws Exception {
@@ -149,11 +198,7 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
private void open() throws Exception {
- if (partitionLoader != null) {
- partitionLoader.open();
- }
-
- this.nextLoadTime = -1;
+ this.nextRefreshTime = -1;
Options options = Options.fromMap(table.options());
this.refreshInterval =
@@ -197,7 +242,15 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
this.lookupTable = FullCacheLookupTable.create(context,
options.get(LOOKUP_CACHE_ROWS));
}
- refreshDynamicPartition(false);
+ if (partitionLoader != null) {
+ partitionLoader.open();
+ partitionLoader.checkRefresh();
+ BinaryRow partition = partitionLoader.partition();
+ if (partition != null) {
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+ }
+ }
+
if (cacheRowFilter != null) {
lookupTable.specifyCacheRowFilter(cacheRowFilter);
}
@@ -222,15 +275,14 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
public Collection<RowData> lookup(RowData keyRow) {
try {
- checkRefresh();
+ tryRefresh();
InternalRow key = new FlinkRowWrapper(keyRow);
if (partitionLoader != null) {
- InternalRow partition = refreshDynamicPartition(true);
- if (partition == null) {
+ if (partitionLoader.partition() == null) {
return Collections.emptyList();
}
- key = JoinedRow.join(key, partition);
+ key = JoinedRow.join(key, partitionLoader.partition());
}
List<InternalRow> results = lookupTable.get(key);
@@ -247,28 +299,6 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
- @Nullable
- private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception
{
- if (partitionLoader == null) {
- return null;
- }
-
- boolean partitionChanged = partitionLoader.checkRefresh();
- BinaryRow partition = partitionLoader.partition();
- if (partition == null) {
- return null;
- }
-
-
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
-
- if (partitionChanged && reopen) {
- lookupTable.close();
- lookupTable.open();
- }
-
- return partition;
- }
-
private Predicate createSpecificPartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
@@ -293,20 +323,73 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
}
- private void checkRefresh() throws Exception {
- if (nextLoadTime > System.currentTimeMillis()) {
+ @VisibleForTesting
+ void tryRefresh() throws Exception {
+ // 1. check if this time is in black list
+ long currentTimeMillis = System.currentTimeMillis();
+ if (nextBlacklistCheckTime > currentTimeMillis) {
return;
}
- if (nextLoadTime > 0) {
+
+ Pair<Long, Long> period = getFirstTimePeriods(timePeriodsBlacklist,
currentTimeMillis);
+ if (period != null) {
+ LOG.info(
+ "Current time {} is in black list {}-{}, so try to refresh
cache next time.",
+ currentTimeMillis,
+ period.getLeft(),
+ period.getRight());
+ nextBlacklistCheckTime = period.getRight() + 1;
+ return;
+ }
+
+ // 2. refresh dynamic partition
+ if (partitionLoader != null) {
+ boolean partitionChanged = partitionLoader.checkRefresh();
+ BinaryRow partition = partitionLoader.partition();
+ if (partition == null) {
+ // no data to be load, fast exit
+ return;
+ }
+
+ if (partitionChanged) {
+ // reopen with latest partition
+
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
+ lookupTable.close();
+ lookupTable.open();
+ // no need to refresh the lookup table because it is reopened
+ return;
+ }
+ }
+
+ // 3. refresh lookup table
+ if (shouldRefreshLookupTable()) {
+ lookupTable.refresh();
+ nextRefreshTime = System.currentTimeMillis() +
refreshInterval.toMillis();
+ }
+ }
+
+ @Nullable
+ private Pair<Long, Long> getFirstTimePeriods(List<Pair<Long, Long>>
timePeriods, long time) {
+ for (Pair<Long, Long> period : timePeriods) {
+ if (period.getLeft() <= time && time <= period.getRight()) {
+ return period;
+ }
+ }
+ return null;
+ }
+
+ private boolean shouldRefreshLookupTable() {
+ if (nextRefreshTime > System.currentTimeMillis()) {
+ return false;
+ }
+
+ if (nextRefreshTime > 0) {
LOG.info(
"Lookup table {} has refreshed after {} second(s),
refreshing",
table.name(),
refreshInterval.toMillis() / 1000);
}
-
- refresh();
-
- nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
+ return true;
}
@VisibleForTesting
@@ -314,8 +397,9 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
return lookupTable;
}
- private void refresh() throws Exception {
- lookupTable.refresh();
+ @VisibleForTesting
+ long nextBlacklistCheckTime() {
+ return nextBlacklistCheckTime;
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index f8c8adcb2..2b219f6a7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
@@ -52,6 +53,9 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -59,8 +63,11 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** Tests for {@link FileStoreLookupFunction}. */
public class FileStoreLookupFunctionTest {
@@ -91,6 +98,18 @@ public class FileStoreLookupFunctionTest {
boolean dynamicPartition,
boolean refreshAsync)
throws Exception {
+ table = createFileStoreTable(isPartition, dynamicPartition,
refreshAsync);
+ lookupFunction = createLookupFunction(table, joinEqualPk);
+ lookupFunction.open(tempDir.toString());
+ }
+
+ private FileStoreLookupFunction createLookupFunction(Table table, boolean
joinEqualPk) {
+ return new FileStoreLookupFunction(
+ table, new int[] {0, 1}, joinEqualPk ? new int[] {0, 1} : new
int[] {1}, null);
+ }
+
+ private FileStoreTable createFileStoreTable(
+ boolean isPartition, boolean dynamicPartition, boolean
refreshAsync) throws Exception {
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
Options conf = new Options();
conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
@@ -106,7 +125,6 @@ public class FileStoreLookupFunctionTest {
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
new String[] {"pt", "k", "v"});
-
Schema schema =
new Schema(
rowType.getFields(),
@@ -115,17 +133,8 @@ public class FileStoreLookupFunctionTest {
conf.toMap(),
"");
TableSchema tableSchema = schemaManager.createTable(schema);
- table =
- FileStoreTableFactory.create(
- fileIO, new
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
-
- lookupFunction =
- new FileStoreLookupFunction(
- table,
- new int[] {0, 1},
- joinEqualPk ? new int[] {0, 1} : new int[] {1},
- null);
- lookupFunction.open(tempDir.toString());
+ return FileStoreTableFactory.create(
+ fileIO, new org.apache.paimon.fs.Path(tempDir.toString()),
tableSchema);
}
@AfterEach
@@ -214,6 +223,68 @@ public class FileStoreLookupFunctionTest {
.isEqualTo(0);
}
+ @Test
+ public void testParseWrongTimePeriodsBlacklist() throws Exception {
+ Table table = createFileStoreTable(false, false, false);
+
+ Table table1 =
+ table.copy(
+ Collections.singletonMap(
+ LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+ "2024-10-31 12:00,2024-10-31 16:00"));
+ assertThatThrownBy(() -> createLookupFunction(table1, true))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Incorrect time periods format: [2024-10-31
12:00,2024-10-31 16:00]."));
+
+ Table table2 =
+ table.copy(
+ Collections.singletonMap(
+ LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+ "20241031 12:00->20241031 16:00"));
+ assertThatThrownBy(() -> createLookupFunction(table2, true))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Date time format error: [20241031 12:00]"));
+
+ Table table3 =
+ table.copy(
+ Collections.singletonMap(
+ LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+ "2024-10-31 12:00->2024-10-31 16:00,2024-10-31
20:00->2024-10-31 18:00"));
+ assertThatThrownBy(() -> createLookupFunction(table3, true))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Incorrect time period: [2024-10-31
20:00->2024-10-31 18:00]"));
+ }
+
+ @Test
+ public void testCheckRefreshInBlacklist() throws Exception {
+ Instant now = Instant.now();
+ Instant start = Instant.ofEpochSecond(now.getEpochSecond() / 60 * 60);
+ Instant end = start.plusSeconds(30 * 60);
+
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm");
+ String left = start.atZone(ZoneId.systemDefault()).format(formatter);
+ String right = end.atZone(ZoneId.systemDefault()).format(formatter);
+
+ Table table =
+ createFileStoreTable(false, false, false)
+ .copy(
+ Collections.singletonMap(
+
LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(),
+ left + "->" + right));
+
+ FileStoreLookupFunction lookupFunction = createLookupFunction(table,
true);
+
+ lookupFunction.tryRefresh();
+
+
assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli()
+ 1);
+ }
+
private void commit(List<CommitMessage> messages) throws Exception {
TableCommitImpl commit = table.newCommit(commitUser);
commit.commit(messages);