This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 051825bd002 [FLINK-26810][connectors/elasticsearch] Use local timezone for TIMESTAMP_WITH_LOCAL_TIMEZONE fields in dynamic index 051825bd002 is described below commit 051825bd00275cbea2a684ad8085b9563aa6fb47 Author: Alexander Preuß <11444089+alp...@users.noreply.github.com> AuthorDate: Thu Mar 31 14:16:11 2022 +0200 [FLINK-26810][connectors/elasticsearch] Use local timezone for TIMESTAMP_WITH_LOCAL_TIMEZONE fields in dynamic index --- .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++-- .../elasticsearch/table/IndexGeneratorFactory.java | 10 +++-- .../elasticsearch/table/IndexGeneratorTest.java | 44 ++++++++++++++++++++++ .../table/IndexGeneratorFactoryTest.java | 35 ++++++++++++++--- 4 files changed, 85 insertions(+), 14 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java index ec2a0069365..92886f40e09 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java @@ -33,7 +33,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; @@ -133,7 +132,8 @@ final class IndexGeneratorFactory { final String dateTimeFormat = indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); DynamicFormatter formatFunction = - createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot); + createFormatFunction( + indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId); return new AbstractTimeIndexGenerator(index, dateTimeFormat) { @Override @@ -163,7 +163,9 @@ final class IndexGeneratorFactory { } private static DynamicFormatter createFormatFunction( - LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) { + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot, + ZoneId localTimeZoneId) { switch (indexFieldLogicalTypeRoot) { case DATE: return (value, dateTimeFormatter) -> { @@ -186,7 +188,7 @@ final class IndexGeneratorFactory { case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (value, dateTimeFormatter) -> { TimestampData indexField = (TimestampData) value; - return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); + return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter); }; default: throw new TableException( diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java index 48f0107b714..8347a479e5a 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java @@ -34,7 +34,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; @@ -130,7 +129,8 @@ final class IndexGeneratorFactory { final String dateTimeFormat = indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot); DynamicFormatter formatFunction = - createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot); + createFormatFunction( + indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId); return new AbstractTimeIndexGenerator(index, dateTimeFormat) { @Override @@ -160,7 +160,9 @@ final class IndexGeneratorFactory { } private static DynamicFormatter createFormatFunction( - LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) { + LogicalType indexFieldType, + LogicalTypeRoot indexFieldLogicalTypeRoot, + ZoneId localTimeZoneId) { switch (indexFieldLogicalTypeRoot) { case DATE: return (value, dateTimeFormatter) -> { @@ -183,7 +185,7 @@ final class IndexGeneratorFactory { case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return (value, dateTimeFormatter) -> { TimestampData indexField = (TimestampData) value; - return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter); + return indexField.toInstant().atZone(localTimeZoneId).format(dateTimeFormatter); }; default: throw new TableException( diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java index 7840bfa81a8..8760d0d4a43 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorTest.java @@ -35,11 +35,14 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.UnsupportedTemporalTypeException; import java.util.Arrays; import java.util.List; +import static org.junit.jupiter.api.Assumptions.assumingThat; + /** Suite tests for {@link IndexGenerator}. */ public class IndexGeneratorTest { @@ -54,6 +57,7 @@ public class IndexGeneratorTest { "local_datetime", "local_date", "local_time", + "local_timestamp", "note", "status"); @@ -68,6 +72,7 @@ public class IndexGeneratorTest { DataTypes.TIMESTAMP().bridgedTo(LocalDateTime.class), DataTypes.DATE().bridgedTo(LocalDate.class), DataTypes.TIME().bridgedTo(LocalTime.class), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), DataTypes.STRING(), DataTypes.BOOLEAN()); @@ -86,6 +91,10 @@ public class IndexGeneratorTest { LocalDateTime.of(2020, 3, 18, 12, 12, 14, 1000)), (int) LocalDate.of(2020, 3, 18).toEpochDay(), (int) (LocalTime.of(12, 13, 14, 2000).toNanoOfDay() / 1_000_000L), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant()), "test1", true), GenericRowData.of( @@ -101,9 +110,44 @@ public class IndexGeneratorTest { LocalDateTime.of(2020, 3, 19, 12, 22, 14, 1000)), (int) LocalDate.of(2020, 3, 19).toEpochDay(), (int) (LocalTime.of(12, 13, 14, 2000).toNanoOfDay() / 1_000_000L), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant()), "test2", false)); + @Test + public void testDynamicIndexFromTimestampTzUTC() { + assumingThat( + ZoneId.systemDefault().equals(ZoneId.of("UTC")), + () -> { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "{local_timestamp|yyyy_MM_dd_HH-ss}_index", + fieldNames, + dataTypes); + indexGenerator.open(); + Assertions.assertEquals( + "2020_03_17_19-14_index", indexGenerator.generate(rows.get(0))); + Assertions.assertEquals( + "2020_03_20_03-14_index", indexGenerator.generate(rows.get(1))); + }); + } + + @Test + public void testDynamicIndexFromTimestampTzWithSpecificTimezone() { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "{local_timestamp|yyyy_MM_dd_HH-ss}_index", + fieldNames, + dataTypes, + ZoneId.of("Europe/Berlin")); + indexGenerator.open(); + Assertions.assertEquals("2020_03_17_20-14_index", indexGenerator.generate(rows.get(0))); + Assertions.assertEquals("2020_03_20_04-14_index", indexGenerator.generate(rows.get(1))); + } + @Test public void testDynamicIndexFromTimestamp() { IndexGenerator indexGenerator = diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java index a5f77595128..bbb5081b733 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java @@ -32,16 +32,19 @@ import org.junit.Before; import org.junit.Test; import java.sql.Timestamp; -import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.temporal.UnsupportedTemporalTypeException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assume.assumeThat; + /** Tests for {@link IndexGeneratorFactory}. */ public class IndexGeneratorFactoryTest extends TestLogger { @@ -71,7 +74,10 @@ public class IndexGeneratorFactoryTest extends TestLogger { (int) LocalDate.parse("2020-03-18").toEpochDay(), (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 1_000_000L), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")), - TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 18, 3, 12, 14, 1000) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant()), true)); rows.add( GenericRowData.of( @@ -81,7 +87,10 @@ public class IndexGeneratorFactoryTest extends TestLogger { (int) LocalDate.parse("2020-03-19").toEpochDay(), (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 1_000_000L), TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")), - TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")), + TimestampData.fromInstant( + LocalDateTime.of(2020, 3, 19, 20, 22, 14, 1000) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant()), false)); } @@ -193,12 +202,26 @@ public class IndexGeneratorFactoryTest extends TestLogger { } @Test - public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() { + public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneUTC() { + assumeThat(ZoneId.systemDefault(), is(ZoneId.of("UTC"))); + IndexGenerator indexGenerator = IndexGeneratorFactory.createIndexGenerator("my-index-{local_timestamp|}", schema); indexGenerator.open(); - Assert.assertEquals("my-index-2020_03_18_12_12_14Z", indexGenerator.generate(rows.get(0))); - Assert.assertEquals("my-index-2020_03_19_12_12_14Z", indexGenerator.generate(rows.get(1))); + Assert.assertEquals("my-index-2020_03_17_19_12_14Z", indexGenerator.generate(rows.get(0))); + Assert.assertEquals("my-index-2020_03_20_03_22_14Z", indexGenerator.generate(rows.get(1))); + } + + @Test + public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZoneWithSpecificTimeZone() { + IndexGenerator indexGenerator = + IndexGeneratorFactory.createIndexGenerator( + "my-index-{local_timestamp|}", schema, ZoneId.of("Europe/Berlin")); + indexGenerator.open(); + Assert.assertEquals( + "my-index-2020_03_17_20_12_14+01", indexGenerator.generate(rows.get(0))); + Assert.assertEquals( + "my-index-2020_03_20_04_22_14+01", indexGenerator.generate(rows.get(1))); } @Test