Repository: flink
Updated Branches:
  refs/heads/table-retraction ee033c903 -> 89d9dec38


[FLINK-5625] [kinesis] Configurable date format for timestamp-based start 
position in FlinkKinesisConsumer

This closes #3651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a119a30d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a119a30d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a119a30d

Branch: refs/heads/table-retraction
Commit: a119a30da91bc88b6b0242622adff2189b0a8fa4
Parents: ee033c9
Author: Tony Wei <tony19920...@gmail.com>
Authored: Thu Mar 30 09:48:43 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Fri Mar 31 12:32:19 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kinesis.md                  |  8 ++-
 .../kinesis/config/ConsumerConfigConstants.java |  7 +-
 .../kinesis/internals/ShardConsumer.java        | 12 +++-
 .../kinesis/util/KinesisConfigUtil.java         | 23 +++---
 .../kinesis/FlinkKinesisConsumerTest.java       | 75 ++++++++++++++++----
 5 files changed, 95 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/docs/dev/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kinesis.md b/docs/dev/connectors/kinesis.md
index 59f3d61..ef1afca 100644
--- a/docs/dev/connectors/kinesis.md
+++ b/docs/dev/connectors/kinesis.md
@@ -121,9 +121,11 @@ one of the following values in the provided configuration 
properties (the naming
 - `LATEST`: read all shards of all streams starting from the latest record.
 - `TRIM_HORIZON`: read all shards of all streams starting from the earliest 
record possible (data may be trimmed by Kinesis depending on the retention 
settings).
 - `AT_TIMESTAMP`: read all shards of all streams starting from a specified 
timestamp. The timestamp must also be specified in the configuration
-properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, either in the date pattern
-`yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, `2016-04-04T19:58:46.480-00:00`), 
or a non-negative double value representing the number of seconds
-that has elapsed since the Unix epoch (for example, `1459799926.480`).
+properties by providing a value for 
`ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following 
date pattern :
+    - a non-negative double value representing the number of seconds that has 
elapsed since the Unix epoch (for example, `1459799926.480`).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` 
provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
+    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined 
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` 
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without 
given a pattern).
 
 #### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 4ffe0ad..7c31af4 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -56,9 +56,12 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
        /** The initial position to start reading Kinesis streams from (LATEST 
is used if not set) */
        public static final String STREAM_INITIAL_POSITION = 
"flink.stream.initpos";
 
-       /** The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION */
+       /** The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
        public static final String STREAM_INITIAL_TIMESTAMP = 
"flink.stream.initpos.timestamp";
 
+       /** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+       public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
+
        /** The base backoff time between each describeStream attempt */
        public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
 
@@ -107,6 +110,8 @@ public class ConsumerConfigConstants extends 
AWSConfigConstants {
 
        public static final String DEFAULT_STREAM_INITIAL_POSITION = 
InitialPosition.LATEST.toString();
 
+       public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
        public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
 
        public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index f6c53ce..ca85854 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +37,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -115,9 +115,15 @@ public class ShardConsumer<T> implements Runnable {
 
                if 
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
 {
                        String timestamp = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+
                        try {
-                               this.initTimestamp = 
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-                       } catch (ParseException e) {
+                               String format = 
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
+                                       
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+                               SimpleDateFormat customDateFormat = new 
SimpleDateFormat(format);
+                               this.initTimestamp = 
customDateFormat.parse(timestamp);
+                       } catch (IllegalArgumentException | 
NullPointerException exception) {
+                               throw new IllegalArgumentException(exception);
+                       } catch (ParseException exception) {
                                this.initTimestamp = new Date((long) 
(Double.parseDouble(timestamp) * 1000));
                        }
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 59b8529..244f5a5 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -37,8 +37,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Utilities for Flink Kinesis connector configuration.
  */
 public class KinesisConfigUtil {
-       public static SimpleDateFormat initTimestampDateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
-
        /**
         * Validate configuration properties for {@link FlinkKinesisConsumer}.
         */
@@ -67,7 +65,9 @@ public class KinesisConfigUtil {
                                        throw new 
IllegalArgumentException("Please set value for initial timestamp ('"
                                                + 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP 
initial position.");
                                }
-                               validateOptionalDateProperty(config, 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+                               validateOptionalDateProperty(config,
+                                       
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
+                                       
config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
                                        "Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
                                                + "Must be a valid format: 
yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 
2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
                        }
@@ -222,17 +222,20 @@ public class KinesisConfigUtil {
                }
        }
 
-       private static void validateOptionalDateProperty(Properties config, 
String key, String message) {
-               if (config.containsKey(key)) {
+       private static void validateOptionalDateProperty(Properties config, 
String timestampKey, String format, String message) {
+               if (config.containsKey(timestampKey)) {
                        try {
-                               
initTimestampDateFormat.parse(config.getProperty(key));
-                       } catch (ParseException parseException) {
+                               SimpleDateFormat customDateFormat = new 
SimpleDateFormat(format);
+                               
customDateFormat.parse(config.getProperty(timestampKey));
+                       } catch (IllegalArgumentException | 
NullPointerException exception) {
+                               throw new IllegalArgumentException(message);
+                       } catch (ParseException exception) {
                                try {
-                                       double value = 
Double.parseDouble(config.getProperty(key));
+                                       double value = 
Double.parseDouble(config.getProperty(timestampKey));
                                        if (value < 0) {
-                                               throw new 
NumberFormatException();
+                                               throw new 
IllegalArgumentException(message);
                                        }
-                               } catch (NumberFormatException 
numberFormatException){
+                               } catch (NumberFormatException 
numberFormatException) {
                                        throw new 
IllegalArgumentException(message);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a119a30d/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 45eb1bd..741f0ca 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -40,7 +40,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -183,7 +183,7 @@ public class FlinkKinesisConsumerTest {
        }
 
        @Test
-       public void 
testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() {
+       public void testDateStringForValidateOptionDateProperty() {
                String timestamp = "2016-04-04T19:58:46.480-00:00";
 
                Properties testConfig = new Properties();
@@ -194,18 +194,16 @@ public class FlinkKinesisConsumerTest {
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
timestamp);
 
-               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
-
                try {
-                       
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
-               } catch (ParseException e){
+                       
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail();
                }
        }
 
        @Test
-       public void 
testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() {
+       public void testUnixTimestampForValidateOptionDateProperty() {
                String unixTimestamp = "1459799926.480";
 
                Properties testConfig = new Properties();
@@ -216,14 +214,65 @@ public class FlinkKinesisConsumerTest {
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
unixTimestamp);
 
+               try {
+                       
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail();
+               }
+       }
+
+       @Test
+       public void testInvalidPatternForInitialTimestampInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"2016-03-14");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
"InvalidPattern");
+
                KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
 
-               try{
-                       double value = Double.parseDouble(unixTimestamp);
-                       if (value < 0) {
-                               throw new NumberFormatException();
-                       }
-               } catch (Exception e){
+       @Test
+       public void 
testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"stillUnparsable");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
"yyyy-MM-dd");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+               String unixTimestamp = "2016-04-04";
+               String pattern = "yyyy-MM-dd";
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
unixTimestamp);
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
pattern);
+
+               try {
+                       
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail();
                }

Reply via email to