This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git

commit 6e61d797872f1e49d1840c3339464bb55c3de2f6
Author: Alexander Preuß <11444089+alp...@users.noreply.github.com>
AuthorDate: Mon Sep 12 16:09:44 2022 +0200

    [FLINK-26810] Use local timezone for TIMESTAMP_WITH_LOCAL_TIMEZONE fields 
in dynamic index
---
 .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++--
 .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++--
 .../elasticsearch/table/IndexGeneratorTest.java    | 43 ++++++++++++++++++++++
 .../table/IndexGeneratorFactoryTest.java           | 31 ++++++++++++++--
 4 files changed, 82 insertions(+), 12 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
index ec2a006..92886f4 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java
@@ -33,7 +33,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -133,7 +132,8 @@ final class IndexGeneratorFactory {
             final String dateTimeFormat =
                     indexHelper.extractDateFormat(index, 
indexFieldLogicalTypeRoot);
             DynamicFormatter formatFunction =
-                    createFormatFunction(indexFieldType, 
indexFieldLogicalTypeRoot);
+                    createFormatFunction(
+                            indexFieldType, indexFieldLogicalTypeRoot, 
localTimeZoneId);
 
             return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                 @Override
@@ -163,7 +163,9 @@ final class IndexGeneratorFactory {
     }
 
     private static DynamicFormatter createFormatFunction(
-            LogicalType indexFieldType, LogicalTypeRoot 
indexFieldLogicalTypeRoot) {
+            LogicalType indexFieldType,
+            LogicalTypeRoot indexFieldLogicalTypeRoot,
+            ZoneId localTimeZoneId) {
         switch (indexFieldLogicalTypeRoot) {
             case DATE:
                 return (value, dateTimeFormatter) -> {
@@ -186,7 +188,7 @@ final class IndexGeneratorFactory {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return (value, dateTimeFormatter) -> {
                     TimestampData indexField = (TimestampData) value;
-                    return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                    return 
indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
                 };
             default:
                 throw new TableException(
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
index 48f0107..8347a47 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -34,7 +34,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -130,7 +129,8 @@ final class IndexGeneratorFactory {
             final String dateTimeFormat =
                     indexHelper.extractDateFormat(index, 
indexFieldLogicalTypeRoot);
             DynamicFormatter formatFunction =
-                    createFormatFunction(indexFieldType, 
indexFieldLogicalTypeRoot);
+                    createFormatFunction(
+                            indexFieldType, indexFieldLogicalTypeRoot, 
localTimeZoneId);
 
             return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                 @Override
@@ -160,7 +160,9 @@ final class IndexGeneratorFactory {
     }
 
     private static DynamicFormatter createFormatFunction(
-            LogicalType indexFieldType, LogicalTypeRoot 
indexFieldLogicalTypeRoot) {
+            LogicalType indexFieldType,
+            LogicalTypeRoot indexFieldLogicalTypeRoot,
+            ZoneId localTimeZoneId) {
         switch (indexFieldLogicalTypeRoot) {
             case DATE:
                 return (value, dateTimeFormatter) -> {
@@ -183,7 +185,7 @@ final class IndexGeneratorFactory {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return (value, dateTimeFormatter) -> {
                     TimestampData indexField = (TimestampData) value;
-                    return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                    return 
indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter);
                 };
             default:
                 throw new TableException(
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
index 0eef8a9..44062a3 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java
@@ -34,12 +34,14 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.assumingThat;
 
 /** Suite tests for {@link IndexGenerator}. */
 public class IndexGeneratorTest {
@@ -55,6 +57,7 @@ public class IndexGeneratorTest {
                     "local_datetime",
                     "local_date",
                     "local_time",
+                    "local_timestamp",
                     "note",
                     "status");
 
@@ -69,6 +72,7 @@ public class IndexGeneratorTest {
                     DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class),
                     DataTypes.DATE().bridgedTo(LocalDate.class),
                     DataTypes.TIME().bridgedTo(LocalTime.class),
+                    DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
                     DataTypes.STRING(),
                     DataTypes.BOOLEAN());
 
@@ -87,6 +91,10 @@ public class IndexGeneratorTest {
                                     LocalDateTime.of(2020, 3, 18, 12, 12, 14, 
1000)),
                             (int) LocalDate.of(2020, 3, 18).toEpochDay(),
                             (int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+                            TimestampData.fromInstant(
+                                    LocalDateTime.of(2020, 3, 18, 3, 12, 14, 
1000)
+                                            .atZone(ZoneId.of("Asia/Shanghai"))
+                                            .toInstant()),
                             "test1",
                             true),
                     GenericRowData.of(
@@ -102,9 +110,44 @@ public class IndexGeneratorTest {
                                     LocalDateTime.of(2020, 3, 19, 12, 22, 14, 
1000)),
                             (int) LocalDate.of(2020, 3, 19).toEpochDay(),
                             (int) (LocalTime.of(12, 13, 14, 
2000).toNanoOfDay() / 1_000_000L),
+                            TimestampData.fromInstant(
+                                    LocalDateTime.of(2020, 3, 19, 20, 22, 14, 
1000)
+                                            
.atZone(ZoneId.of("America/Los_Angeles"))
+                                            .toInstant()),
                             "test2",
                             false));
 
+    @Test
+    public void testDynamicIndexFromTimestampTzUTC() {
+        assumingThat(
+                ZoneId.systemDefault().equals(ZoneId.of("UTC")),
+                () -> {
+                    IndexGenerator indexGenerator =
+                            IndexGeneratorFactory.createIndexGenerator(
+                                    "{local_timestamp|yyyy_MM_dd_HH-ss}_index",
+                                    fieldNames,
+                                    dataTypes);
+                    indexGenerator.open();
+                    assertThat(indexGenerator.generate(rows.get(0)))
+                            .isEqualTo("2020_03_17_19-14_index");
+                    assertThat(indexGenerator.generate(rows.get(1)))
+                            .isEqualTo("2020_03_20_03-14_index");
+                });
+    }
+
+    @Test
+    public void testDynamicIndexFromTimestampTzWithSpecificTimezone() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "{local_timestamp|yyyy_MM_dd_HH-ss}_index",
+                        fieldNames,
+                        dataTypes,
+                        ZoneId.of("Europe/Berlin"));
+        indexGenerator.open();
+        
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("2020_03_17_20-14_index");
+        
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("2020_03_20_04-14_index");
+    }
+
     @Test
     public void testDynamicIndexFromTimestamp() {
         IndexGenerator indexGenerator =
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
index 5941a0d..5b90d15 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
@@ -31,10 +31,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.sql.Timestamp;
-import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,6 +42,9 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assume.assumeThat;
+
 /** Tests for {@link IndexGeneratorFactory}. */
 public class IndexGeneratorFactoryTest extends TestLogger {
 
@@ -71,7 +74,10 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                         (int) LocalDate.parse("2020-03-18").toEpochDay(),
                         (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 
1_000_000L),
                         
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")),
-                        
TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")),
+                        TimestampData.fromInstant(
+                                LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000)
+                                        .atZone(ZoneId.of("Asia/Shanghai"))
+                                        .toInstant()),
                         true));
         rows.add(
                 GenericRowData.of(
@@ -81,7 +87,10 @@ public class IndexGeneratorFactoryTest extends TestLogger {
                         (int) LocalDate.parse("2020-03-19").toEpochDay(),
                         (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 
1_000_000L),
                         
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")),
-                        
TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")),
+                        TimestampData.fromInstant(
+                                LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000)
+                                        
.atZone(ZoneId.of("America/Los_Angeles"))
+                                        .toInstant()),
                         false));
     }
 
@@ -194,7 +203,9 @@ public class IndexGeneratorFactoryTest extends TestLogger {
     }
 
     @Test
-    public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() {
+    public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneUTC() {
+        assumeThat(ZoneId.systemDefault(), is(ZoneId.of("UTC")));
+
         IndexGenerator indexGenerator =
                 
IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", 
schema);
         indexGenerator.open();
@@ -202,6 +213,18 @@ public class IndexGeneratorFactoryTest extends TestLogger {
         
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020_03_20_03_22_14Z");
     }
 
+    @Test
+    public void 
testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneWithSpecificTimeZone() {
+        IndexGenerator indexGenerator =
+                IndexGeneratorFactory.createIndexGenerator(
+                        "my-index-{local_timestamp|}", schema, 
ZoneId.of("Europe/Berlin"));
+        indexGenerator.open();
+        assertThat(indexGenerator.generate(rows.get(0)))
+                .isEqualTo("my-index-2020_03_17_20_12_14+01");
+        assertThat(indexGenerator.generate(rows.get(1)))
+                .isEqualTo("my-index-2020_03_20_04_22_14+01");
+    }
+
     @Test
     public void testGeneralDynamicIndex() {
         IndexGenerator indexGenerator =

Reply via email to