[FLINK-6211] [kinesis] Fix AT_TIMESTAMP config validation for FlinkKinesisConsumer
This closes #3636. This closes #3637. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69843fef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69843fef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69843fef Branch: refs/heads/table-retraction Commit: 69843fefca49a9ce43db7aa4810716de72cc6b5f Parents: cd55274 Author: Tony Wei <tony19920...@gmail.com> Authored: Wed Mar 29 15:08:04 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Mar 29 22:24:44 2017 +0800 ---------------------------------------------------------------------- .../kinesis/util/KinesisConfigUtil.java | 13 ++-- .../kinesis/FlinkKinesisConsumerTest.java | 65 ++++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/69843fef/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index eb29d78..59b8529 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -226,12 +226,15 @@ public class KinesisConfigUtil { if (config.containsKey(key)) { try { initTimestampDateFormat.parse(config.getProperty(key)); - double value = Double.parseDouble(config.getProperty(key)); - if (value < 0) { - throw new NumberFormatException(); + } catch (ParseException parseException) { + try { + double value = Double.parseDouble(config.getProperty(key)); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException numberFormatException){ + throw new IllegalArgumentException(message); } - } catch (ParseException | NumberFormatException e) { - throw new IllegalArgumentException(message); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/69843fef/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 2cc0270..45eb1bd 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -40,12 +40,14 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.text.ParseException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.UUID; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. @@ -165,6 +167,69 @@ public class FlinkKinesisConsumerTest { } @Test + public void testIllegalValueForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testDateStringInValidateOptionDatePropertyForInitialTimestampInConfig() { + String timestamp = "2016-04-04T19:58:46.480-00:00"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + + try { + KinesisConfigUtil.initTimestampDateFormat.parse(timestamp); + } catch (ParseException e){ + e.printStackTrace(); + fail(); + } + } + + @Test + public void testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInConfig() { + String unixTimestamp = "1459799926.480"; + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + + try{ + double value = Double.parseDouble(unixTimestamp); + if (value < 0) { + throw new NumberFormatException(); + } + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + @Test public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");