[
https://issues.apache.org/jira/browse/FLINK-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948784#comment-15948784
]
ASF GitHub Bot commented on FLINK-5625:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3651#discussion_r108885717
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -115,9 +115,15 @@ protected ShardConsumer(KinesisDataFetcher<T>
fetcherRef,
if
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
String timestamp =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
+
try {
- this.initTimestamp =
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
- } catch (ParseException e) {
+ String format =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
+
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
+ SimpleDateFormat customDateFormat = new
SimpleDateFormat(format);
+ this.initTimestamp =
customDateFormat.parse(timestamp);
+ } catch (IllegalArgumentException |
NullPointerException exception) {
+ throw new
IllegalArgumentException(exception.getCause());
--- End diff --
I think we should just wrap the whole exception instance and not only the
exception object here.
> Let Date format for timestamp-based start position in Kinesis consumer be
> configurable.
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-5625
> URL: https://issues.apache.org/jira/browse/FLINK-5625
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> Currently, the Kinesis consumer's Date format for timestamp-based start
> positions is fixed. It'll be nice to make this format configurable.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)