[FLINK-6211] [kinesis] Fix AT_TIMESTAMP config validation for 
FlinkKinesisConsumer

This closes #3636.
This closes #3637.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69843fef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69843fef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69843fef

Branch: refs/heads/table-retraction
Commit: 69843fefca49a9ce43db7aa4810716de72cc6b5f
Parents: cd55274
Author: Tony Wei <tony19920...@gmail.com>
Authored: Wed Mar 29 15:08:04 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Mar 29 22:24:44 2017 +0800

----------------------------------------------------------------------
 .../kinesis/util/KinesisConfigUtil.java         | 13 ++--
 .../kinesis/FlinkKinesisConsumerTest.java       | 65 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69843fef/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index eb29d78..59b8529 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -226,12 +226,15 @@ public class KinesisConfigUtil {
                if (config.containsKey(key)) {
                        try {
                                
initTimestampDateFormat.parse(config.getProperty(key));
-                               double value = 
Double.parseDouble(config.getProperty(key));
-                               if (value < 0) {
-                                       throw new NumberFormatException();
+                       } catch (ParseException parseException) {
+                               try {
+                                       double value = 
Double.parseDouble(config.getProperty(key));
+                                       if (value < 0) {
+                                               throw new 
NumberFormatException();
+                                       }
+                               } catch (NumberFormatException 
numberFormatException){
+                                       throw new 
IllegalArgumentException(message);
                                }
-                       } catch (ParseException | NumberFormatException e) {
-                               throw new IllegalArgumentException(message);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/69843fef/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 2cc0270..45eb1bd 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -40,12 +40,14 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.text.ParseException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Suite of FlinkKinesisConsumer tests for the methods called throughout the 
source life cycle.
@@ -165,6 +167,69 @@ public class FlinkKinesisConsumerTest {
        }
 
        @Test
+       public void testIllegalValueForInitialTimestampInConfig() {
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"-1.0");
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+       }
+
+       @Test
+       public void 
testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() {
+               String timestamp = "2016-04-04T19:58:46.480-00:00";
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
timestamp);
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+
+               try {
+                       
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
+               } catch (ParseException e){
+                       e.printStackTrace();
+                       fail();
+               }
+       }
+
+       @Test
+       public void 
testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() {
+               String unixTimestamp = "1459799926.480";
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
+               
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
+               
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
unixTimestamp);
+
+               KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+
+               try{
+                       double value = Double.parseDouble(unixTimestamp);
+                       if (value < 0) {
+                               throw new NumberFormatException();
+                       }
+               } catch (Exception e){
+                       e.printStackTrace();
+                       fail();
+               }
+       }
+
+       @Test
        public void 
testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("Invalid value given for describe 
stream operation base backoff milliseconds");

Reply via email to