[
https://issues.apache.org/jira/browse/FLINK-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948783#comment-15948783
]
ASF GitHub Bot commented on FLINK-5625:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3651#discussion_r108886633
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
---
@@ -216,14 +217,73 @@ public void
testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
unixTimestamp);
- KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ try {
+
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
- try{
double value = Double.parseDouble(unixTimestamp);
if (value < 0) {
throw new NumberFormatException();
}
- } catch (Exception e){
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void testInvalidPatternForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial
timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"BASIC");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"secretKey");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
"2016-03-14");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
"InvalidPattern");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void
testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Invalid value given for initial
timestamp for AT_TIMESTAMP initial position in stream.");
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"BASIC");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"secretKey");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
"stillUnparsable");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
"yyyy-MM-dd");
+
+ KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+ }
+
+ @Test
+ public void
testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
+ String unixTimestamp = "2016-04-04";
+ String pattern = "yyyy-MM-dd";
+
+ Properties testConfig = new Properties();
+ testConfig.setProperty(ConsumerConfigConstants.AWS_REGION,
"us-east-1");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
"BASIC");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"accessKeyId");
+
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"secretKey");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"AT_TIMESTAMP");
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
unixTimestamp);
+
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
pattern);
+
+ try {
+
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+
+ SimpleDateFormat customDateFormat = new
SimpleDateFormat(pattern);
+ customDateFormat.parse(unixTimestamp);
--- End diff --
Are these 2 lines necessary?:
```
SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern);
customDateFormat.parse(unixTimestamp);
```
> Let Date format for timestamp-based start position in Kinesis consumer be
> configurable.
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-5625
> URL: https://issues.apache.org/jira/browse/FLINK-5625
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> Currently, the Kinesis consumer's Date format for timestamp-based start
> positions is fixed. It'll be nice to make this format configurable.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)