Repository: flink Updated Branches: refs/heads/table-retraction ee033c903 -> 89d9dec38
[FLINK-5625] [kinesis] Configurable date format for timestamp-based start position in FlinkKinesisConsumer This closes #3651. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a119a30d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a119a30d Branch: refs/heads/table-retraction Commit: a119a30da91bc88b6b0242622adff2189b0a8fa4 Parents: ee033c9 Author: Tony Wei <tony19920...@gmail.com> Authored: Thu Mar 30 09:48:43 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Fri Mar 31 12:32:19 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kinesis.md | 8 ++- .../kinesis/config/ConsumerConfigConstants.java | 7 +- .../kinesis/internals/ShardConsumer.java | 12 +++- .../kinesis/util/KinesisConfigUtil.java | 23 +++--- .../kinesis/FlinkKinesisConsumerTest.java | 75 ++++++++++++++++---- 5 files changed, 95 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/docs/dev/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md index 59f3d61..ef1afca 100644 --- a/docs/dev/connectors/kinesis.md +++ b/docs/dev/connectors/kinesis.md @@ -121,9 +121,11 @@ one of the following values in the provided configuration properties (the naming - `LATEST`: read all shards of all streams starting from the latest record. - `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings). - `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration -properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern -`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), or a non-negative double value representing the number of seconds -that has elapsed since the Unix epoch (for example, `1459799926.480`). +properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern : + - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). + - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. + If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` + (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 4ffe0ad..7c31af4 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -56,9 +56,12 @@ public class ConsumerConfigConstants extends AWSConfigConstants { /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; - /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */ + /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */ public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp"; + /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */ + public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; + /** The base backoff time between each describeStream attempt */ public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; @@ -107,6 +110,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants { public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index f6c53ce..ca85854 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -30,7 +30,6 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +37,7 @@ import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Properties; @@ -115,9 +115,15 @@ public class ShardConsumer<T> implements Runnable { if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) { String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP); + try { - this.initTimestamp = KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e) { + String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, + ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT); + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + this.initTimestamp = customDateFormat.parse(timestamp); + } catch (IllegalArgumentException | NullPointerException exception) { + throw new IllegalArgumentException(exception); + } catch (ParseException exception) { this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000)); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 59b8529..244f5a5 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -37,8 +37,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Utilities for Flink Kinesis connector configuration. */ public class KinesisConfigUtil { - public static SimpleDateFormat initTimestampDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - /** * Validate configuration properties for {@link FlinkKinesisConsumer}. */ @@ -67,7 +65,9 @@ public class KinesisConfigUtil { throw new IllegalArgumentException("Please set value for initial timestamp ('" + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); } - validateOptionalDateProperty(config, ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + validateOptionalDateProperty(config, + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, + config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT), "Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " + "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 ."); } @@ -222,17 +222,20 @@ public class KinesisConfigUtil { } } - private static void validateOptionalDateProperty(Properties config, String key, String message) { - if (config.containsKey(key)) { + private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) { + if (config.containsKey(timestampKey)) { try { - initTimestampDateFormat.parse(config.getProperty(key)); - } catch (ParseException parseException) { + SimpleDateFormat customDateFormat = new SimpleDateFormat(format); + customDateFormat.parse(config.getProperty(timestampKey)); + } catch (IllegalArgumentException | NullPointerException exception) { + throw new IllegalArgumentException(message); + } catch (ParseException exception) { try { - double value = Double.parseDouble(config.getProperty(key)); + double value = Double.parseDouble(config.getProperty(timestampKey)); if (value < 0) { - throw new NumberFormatException(); + throw new IllegalArgumentException(message); } - } catch (NumberFormatException numberFormatException){ + } catch (NumberFormatException numberFormatException) { throw new IllegalArgumentException(message); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 45eb1bd..741f0ca 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -40,7 +40,7 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -183,7 +183,7 @@ public class FlinkKinesisConsumerTest { } @Test - public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testDateStringForValidateOptionDateProperty() { String timestamp = "2016-04-04T19:58:46.480-00:00"; Properties testConfig = new Properties(); @@ -194,18 +194,16 @@ public class FlinkKinesisConsumerTest { testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); - KinesisConfigUtil.validateConsumerConfiguration(testConfig); - try { - KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); - } catch (ParseException e){ + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } catch (Exception e) { e.printStackTrace(); fail(); } } @Test - public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() { + public void testUnixTimestampForValidateOptionDateProperty() { String unixTimestamp = "1459799926.480"; Properties testConfig = new Properties(); @@ -216,14 +214,65 @@ public class FlinkKinesisConsumerTest { testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + @Test + public void testInvalidPatternForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } - try{ - double value = Double.parseDouble(unixTimestamp); - if (value < 0) { - throw new NumberFormatException(); - } - } catch (Exception e){ + @Test + public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() { + String unixTimestamp = "2016-04-04"; + String pattern = "yyyy-MM-dd"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern); + + try { + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } catch (Exception e) { e.printStackTrace(); fail(); }