This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
commit 6e61d797872f1e49d1840c3339464bb55c3de2f6 Author: Alexander Preuß <11444089+alp...@users.noreply.github.com> AuthorDate: Mon Sep 12 16:09:44 2022 +0200 [FLINK-26810] Use local timezone for TIMESTAMP_WITH_LOCAL_TIMEZONE fields in dynamic index --- .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++-- .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++-- .../elasticsearch/table/IndexGeneratorTest.java | 43 ++++++++++++++++++++++ .../table/IndexGeneratorFactoryTest.java | 31 ++++++++++++++-- 4 files changed, 82 insertions(+), 12 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java index ec2a006..92886f4 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java @@ -33,7 +33,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; @@ -133,7 +132,8 @@ final class IndexGeneratorFactory { final String dateTimeFormat = indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); DynamicFormatter formatFunction = - createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot); + createFormatFunction( + indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId); return new AbstractTimeIndexGenerator(index, dateTimeFormat) { @Override @@ -163,7 +163,9 @@ final class IndexGeneratorFactory { } private static DynamicFormatter createFormatFunction( - LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) { + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot, + ZoneId localTimeZoneId) { switch (indexFieldLogicalTypeRoot) { case DATE: return (value, dateTimeFormatter) -> { @@ -186,7 +188,7 @@ final class IndexGeneratorFactory { case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (value, dateTimeFormatter) -> { TimestampData indexField = (TimestampData) value; - return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); + return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter); }; default: throw new TableException( diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java index 48f0107..8347a47 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java @@ -34,7 +34,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; @@ -130,7 +129,8 @@ final class IndexGeneratorFactory { final String dateTimeFormat = indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); DynamicFormatter formatFunction = - createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot); + createFormatFunction( + indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId); return new AbstractTimeIndexGenerator(index, dateTimeFormat) { @Override @@ -160,7 +160,9 @@ final class IndexGeneratorFactory { } private static DynamicFormatter createFormatFunction( - LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) { + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot, + ZoneId localTimeZoneId) { switch (indexFieldLogicalTypeRoot) { case DATE: return (value, dateTimeFormatter) -> { @@ -183,7 +185,7 @@ final class IndexGeneratorFactory { case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (value, dateTimeFormatter) -> { TimestampData indexField = (TimestampData) value; - return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); + return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter); }; default: throw new TableException( diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java index 0eef8a9..44062a3 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java @@ -34,12 +34,14 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.assumingThat; /** Suite tests for {@link IndexGenerator}. */ public class IndexGeneratorTest { @@ -55,6 +57,7 @@ public class IndexGeneratorTest { "local_datetime", "local_date", "local_time", + "local_timestamp", "note", "status"); @@ -69,6 +72,7 @@ public class IndexGeneratorTest { DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class), DataTypes.DATE().bridgedTo(LocalDate.class), DataTypes.TIME().bridgedTo(LocalTime.class), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), DataTypes.STRING(), DataTypes.BOOLEAN()); @@ -87,6 +91,10 @@ public class IndexGeneratorTest { LocalDateTime.of(2020, 3, 18, 12, 12, 14, 1000)), (int) LocalDate.of(2020, 3, 18).toEpochDay(), (int) (LocalTime.of(12, 13, 14, 2000).toNanoOfDay() / 1_000_000L), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant()), "test1", true), GenericRowData.of( @@ -102,9 +110,44 @@ public class IndexGeneratorTest { LocalDateTime.of(2020, 3, 19, 12, 22, 14, 1000)), (int) LocalDate.of(2020, 3, 19).toEpochDay(), (int) (LocalTime.of(12, 13, 14, 2000).toNanoOfDay() / 1_000_000L), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant()), "test2", false)); + @Test + public void testDynamicIndexFromTimestampTzUTC() { + assumingThat( + ZoneId.systemDefault().equals(ZoneId.of("UTC")), + () -> { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "{local_timestamp|yyyy_MM_dd_HH-ss}_index", + fieldNames, + dataTypes); + indexGenerator.open(); + assertThat(indexGenerator.generate(rows.get(0))) + .isEqualTo("2020_03_17_19-14_index"); + assertThat(indexGenerator.generate(rows.get(1))) + .isEqualTo("2020_03_20_03-14_index"); + }); + } + + @Test + public void testDynamicIndexFromTimestampTzWithSpecificTimezone() { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "{local_timestamp|yyyy_MM_dd_HH-ss}_index", + fieldNames, + dataTypes, + ZoneId.of("Europe/Berlin")); + indexGenerator.open(); + assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("2020_03_17_20-14_index"); + assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("2020_03_20_04-14_index"); + } + @Test public void testDynamicIndexFromTimestamp() { IndexGenerator indexGenerator = diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java index 5941a0d..5b90d15 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java @@ -31,10 +31,10 @@ import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; -import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; @@ -42,6 +42,9 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assume.assumeThat; + /** Tests for {@link IndexGeneratorFactory}. */ public class IndexGeneratorFactoryTest extends TestLogger { @@ -71,7 +74,10 @@ public class IndexGeneratorFactoryTest extends TestLogger { (int) LocalDate.parse("2020-03-18").toEpochDay(), (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 1_000_000L), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")), - TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant()), true)); rows.add( GenericRowData.of( @@ -81,7 +87,10 @@ public class IndexGeneratorFactoryTest extends TestLogger { (int) LocalDate.parse("2020-03-19").toEpochDay(), (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 1_000_000L), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")), - TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant()), false)); } @@ -194,7 +203,9 @@ public class IndexGeneratorFactoryTest extends TestLogger { } @Test - public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() { + public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneUTC() { + assumeThat(ZoneId.systemDefault(), is(ZoneId.of("UTC"))); + IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", schema); indexGenerator.open(); @@ -202,6 +213,18 @@ public class IndexGeneratorFactoryTest extends TestLogger { assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020_03_20_03_22_14Z"); } + @Test + public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneWithSpecificTimeZone() { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "my-index-{local_timestamp|}", schema, ZoneId.of("Europe/Berlin")); + indexGenerator.open(); + assertThat(indexGenerator.generate(rows.get(0))) + .isEqualTo("my-index-2020_03_17_20_12_14+01"); + assertThat(indexGenerator.generate(rows.get(1))) + .isEqualTo("my-index-2020_03_20_04_22_14+01"); + } + @Test public void testGeneralDynamicIndex() { IndexGenerator indexGenerator =