This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 051825bd002 [FLINK-26810][connectors/elasticsearch] Use local timezone 
for TIMESTAMP_WITH_LOCAL_TIMEZONE fields in dynamic index
051825bd002 is described below

commit 051825bd00275cbea2a684ad8085b9563aa6fb47
Author: Alexander Preuß <11444089+alp...@users.noreply.github.com>
AuthorDate: Thu Mar 31 14:16:11 2022 +0200

    [FLINK-26810][connectors/elasticsearch] Use local timezone for 
TIMESTAMP_WITH_LOCAL_TIMEZONE fields in dynamic index
---
 .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++--
 .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++--
 .../elasticsearch/table/IndexGeneratorTest.java    | 44 ++++++++++++++++++++++
 .../table/IndexGeneratorFactoryTest.java           | 35 ++++++++++++++---
 4 files changed, 85 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
index ec2a0069365..92886f40e09 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -33,7 +33,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -133,7 +132,8 @@ final class IndexGeneratorFactory {
             final String dateTimeFormat =
                     indexHelper.extractDateFormat(index, 
indexFieldLogicalTypeRoot);
             DynamicFormatter formatFunction =
-                    createFormatFunction(indexFieldType, 
indexFieldLogicalTypeRoot);
+                    createFormatFunction(
+                            indexFieldType, indexFieldLogicalTypeRoot, 
localTimeZoneId);
 
             return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                 @Override
@@ -163,7 +163,9 @@ final class IndexGeneratorFactory {
     }
 
     private static DynamicFormatter createFormatFunction(
-            LogicalType indexFieldType, LogicalTypeRoot 
indexFieldLogicalTypeRoot) {
+            LogicalType indexFieldType,
+            LogicalTypeRoot indexFieldLogicalTypeRoot,
+            ZoneId localTimeZoneId) {
         switch (indexFieldLogicalTypeRoot) {
             case DATE:
                 return (value, dateTimeFormatter) -> {
@@ -186,7 +188,7 @@ final class IndexGeneratorFactory {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return (value, dateTimeFormatter) -> {
                     TimestampData indexField = (TimestampData) value;
-                    return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                    return 
indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
                 };
             default:
                 throw new TableException(
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
index 48f0107b714..8347a479e5a 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -34,7 +34,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -130,7 +129,8 @@ final class IndexGeneratorFactory {
             final String dateTimeFormat =
                     indexHelper.extractDateFormat(index, 
indexFieldLogicalTypeRoot);
             DynamicFormatter formatFunction =
-                    createFormatFunction(indexFieldType, 
indexFieldLogicalTypeRoot);
+                    createFormatFunction(
+                            indexFieldType, indexFieldLogicalTypeRoot, 
localTimeZoneId);
 
             return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                 @Override
@@ -160,7 +160,9 @@ final class IndexGeneratorFactory {
     }
 
     private static DynamicFormatter createFormatFunction(
-            LogicalType indexFieldType, LogicalTypeRoot 
indexFieldLogicalTypeRoot) {
+            LogicalType indexFieldType,
+            LogicalTypeRoot indexFieldLogicalTypeRoot,
+            ZoneId localTimeZoneId) {
         switch (indexFieldLogicalTypeRoot) {
             case DATE:
                 return (value, dateTimeFormatter) -> {
@@ -183,7 +185,7 @@ final class IndexGeneratorFactory {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return (value, dateTimeFormatter) -> {
                     TimestampData indexField = (TimestampData) value;
-                    return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                    return 
indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
                 };
             default:
                 throw new TableException(
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
index 7840bfa81a8..8760d0d4a43 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
@@ -35,11 +35,14 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.UnsupportedTemporalTypeException;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.junit.jupiter.api.Assumptions.assumingThat;
+
 /** Suite tests for {@link IndexGenerator}. */
 public class IndexGeneratorTest {
 
@@ -54,6 +57,7 @@ public class IndexGeneratorTest {
                     "local_datetime",
                     "local_date",
                     "local_time",
+                    "local_timestamp",
                     "note",
                     "status");
 
@@ -68,6 +72,7 @@ public class IndexGeneratorTest {
                     DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class),
                     DataTypes.DATE().bridgedTo(LocalDate.class),
                     DataTypes.TIME().bridgedTo(LocalTime.class),
+                    DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
                     DataTypes.STRING(),
                     DataTypes.BOOLEAN());
 
@@ -86,6 +91,10 @@ public class IndexGeneratorTest {
                                     LocalDateTime.of(2020, 3, 18, 12, 12, 14, 
1000)),
                             (int) LocalDate.of(2020, 3, 18).toEpochDay(),
                             (int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+                            TimestampData.fromInstant(
+                                    LocalDateTime.of(2020, 3, 18, 3, 12, 14, 
1000)
+                                            .atZone(ZoneId.of("Asia/Shanghai"))
+                                            .toInstant()),
                             "test1",
                             true),
                     GenericRowData.of(
@@ -101,9 +110,44 @@ public class IndexGeneratorTest {
                                     LocalDateTime.of(2020, 3, 19, 12, 22, 14, 
1000)),
                             (int) LocalDate.of(2020, 3, 19).toEpochDay(),
                             (int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+                            TimestampData.fromInstant(
+                                    LocalDateTime.of(2020, 3, 19, 20, 22, 14, 
1000)
+                                            
.atZone(ZoneId.of("America/Los_Angeles"))
+                                            .toInstant()),
                             "test2",
                             false));
 
+    @Test
+    public void testDynamicIndexFromTimestampTzUTC() {
+        assumingThat(
+                ZoneId.systemDefault().equals(ZoneId.of("UTC")),
+                () -> {
+                    IndexGenerator indexGenerator =
+                            IndexGeneratorFactory.createIndexGenerator(
+                                    "{local_timestamp|yyyy_MM_dd_HH-ss}_index",
+                                    fieldNames,
+                                    dataTypes);
+                    indexGenerator.open();
+                    Assertions.assertEquals(
+                            "2020_03_17_19-14_index", 
indexGenerator.generate(rows.get(0)));
+                    Assertions.assertEquals(
+                            "2020_03_20_03-14_index", 
indexGenerator.generate(rows.get(1)));
+                });
+    }
+
+    @Test
+    public void testDynamicIndexFromTimestampTzWithSpecificTimezone() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "{local_timestamp|yyyy_MM_dd_HH-ss}_index",
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.of("Europe/Berlin"));
+        indexGenerator.open();
+        Assertions.assertEquals("2020_03_17_20-14_index", 
indexGenerator.generate(rows.get(0)));
+        Assertions.assertEquals("2020_03_20_04-14_index", 
indexGenerator.generate(rows.get(1)));
+    }
+
     @Test
     public void testDynamicIndexFromTimestamp() {
         IndexGenerator indexGenerator =
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
index a5f77595128..bbb5081b733 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
@@ -32,16 +32,19 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.sql.Timestamp;
-import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.UnsupportedTemporalTypeException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assume.assumeThat;
+
 /** Tests for {@link IndexGeneratorFactory}. */
 public class IndexGeneratorFactoryTest extends TestLogger {
 
@@ -71,7 +74,10 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                         (int) LocalDate.parse("2020-03-18").toEpochDay(),
                         (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 
1_000_000L),
                         
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")),
-                        
TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")),
+                        TimestampData.fromInstant(
+                                LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000)
+                                        .atZone(ZoneId.of("Asia/Shanghai"))
+                                        .toInstant()),
                         true));
         rows.add(
                 GenericRowData.of(
@@ -81,7 +87,10 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                         (int) LocalDate.parse("2020-03-19").toEpochDay(),
                         (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 
1_000_000L),
                         
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")),
-                        
TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")),
+                        TimestampData.fromInstant(
+                                LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000)
+                                        
.atZone(ZoneId.of("America/Los_Angeles"))
+                                        .toInstant()),
                         false));
     }
 
@@ -193,12 +202,26 @@ public class IndexGeneratorFactoryTest extends TestLogger 
{
     }
 
     @Test
-    public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() {
+    public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneUTC() {
+        assumeThat(ZoneId.systemDefault(), is(ZoneId.of("UTC")));
+
         IndexGenerator indexGenerator =
                 
IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", 
schema);
         indexGenerator.open();
-        Assert.assertEquals("my-index-2020_03_18_12_12_14Z", 
indexGenerator.generate(rows.get(0)));
-        Assert.assertEquals("my-index-2020_03_19_12_12_14Z", 
indexGenerator.generate(rows.get(1)));
+        Assert.assertEquals("my-index-2020_03_17_19_12_14Z", 
indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals("my-index-2020_03_20_03_22_14Z", 
indexGenerator.generate(rows.get(1)));
+    }
+
+    @Test
+    public void 
testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneWithSpecificTimeZone() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "my-index-{local_timestamp|}", schema, 
ZoneId.of("Europe/Berlin"));
+        indexGenerator.open();
+        Assert.assertEquals(
+                "my-index-2020_03_17_20_12_14+01", 
indexGenerator.generate(rows.get(0)));
+        Assert.assertEquals(
+                "my-index-2020_03_20_04_22_14+01", 
indexGenerator.generate(rows.get(1)));
     }
 
     @Test

Reply via email to