This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7ae6448377 [core] process-time TagTimeExtractor supports custom zone
(#5927)
7ae6448377 is described below
commit 7ae644837725181ff7e68a6e52c505fe7373d4cb
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 22 16:30:50 2025 +0800
[core] process-time TagTimeExtractor supports custom zone (#5927)
---
.../layouts/shortcodes/generated/core_configuration.html | 6 ++++++
.../src/main/java/org/apache/paimon/CoreOptions.java | 16 ++++++++++++++++
.../java/org/apache/paimon/tag/TagTimeExtractor.java | 12 ++++++++----
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index f4276640e8..3d1bde2e94 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -971,6 +971,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td><p>Enum</p></td>
<td>Specify the order of sequence.field.<br /><br />Possible
values:<ul><li>"ascending": specifies sequence.field sort order is
ascending.</li><li>"descending": specifies sequence.field sort order is
descending.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>sink.process-time-zone</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The time zone to parse the long process time to TIMESTAMP
value. The default value is JVM's default time zone. If you want to specify a
time zone, you should either set a full name such as 'America/Los_Angeles' or a
custom zone id such as 'GMT-08:00'. This option currently is used for extract
tag name.</td>
+ </tr>
<tr>
<td><h5>sink.watermark-time-zone</h5></td>
<td style="word-wrap: break-word;">"UTC"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d41fdcbfa9..e5eeb17dc7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -1579,6 +1580,16 @@ public class CoreOptions implements Serializable {
+ " the value should be the user
configured local time zone. The option value is either a full name"
+ " such as 'America/Los_Angeles', or a
custom timezone id such as 'GMT-08:00'.");
+ public static final ConfigOption<String> SINK_PROCESS_TIME_ZONE =
+ key("sink.process-time-zone")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The time zone to parse the long process time to
TIMESTAMP value. The default value is JVM's "
+ + "default time zone. If you want to
specify a time zone, you should either set a "
+ + "full name such as 'America/Los_Angeles'
or a custom zone id such as 'GMT-08:00'. "
+ + "This option currently is used for
extract tag name.");
+
public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE =
key("local-merge-buffer-size")
.memoryType()
@@ -2730,6 +2741,11 @@ public class CoreOptions implements Serializable {
return options.get(SINK_WATERMARK_TIME_ZONE);
}
+ public ZoneId sinkProcessTimeZone() {
+ String zoneId = options.get(SINK_PROCESS_TIME_ZONE);
+ return zoneId == null ? ZoneId.systemDefault() : ZoneId.of(zoneId);
+ }
+
public boolean forceCreatingSnapshot() {
return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index 5b046e752f..b94c613251 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -36,12 +36,16 @@ public interface TagTimeExtractor {
/** Extract time from snapshot time millis. */
class ProcessTimeExtractor implements TagTimeExtractor {
+ private final ZoneId processTimeZoneId;
+
+ private ProcessTimeExtractor(ZoneId processTimeZoneId) {
+ this.processTimeZoneId = processTimeZoneId;
+ }
+
@Override
public Optional<LocalDateTime> extract(long timeMilli, @Nullable Long
watermark) {
return Optional.of(
- Instant.ofEpochMilli(timeMilli)
- .atZone(ZoneId.systemDefault())
- .toLocalDateTime());
+
Instant.ofEpochMilli(timeMilli).atZone(processTimeZoneId).toLocalDateTime());
}
}
@@ -82,7 +86,7 @@ public interface TagTimeExtractor {
case BATCH:
return null;
case PROCESS_TIME:
- return new ProcessTimeExtractor();
+ return new ProcessTimeExtractor(options.sinkProcessTimeZone());
case WATERMARK:
return new
WatermarkExtractor(ZoneId.of(options.sinkWatermarkTimeZone()));
default: