[ 
https://issues.apache.org/jira/browse/FLINK-32394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-32394:
----------------------------------
    Description: 
The Init.pos is partially stored in connector state which can lead to 
inconsistencies further down the line in idle streams. In particularly an issue 
arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to 
TRIM_HORIZON.

The issue is that AT_TIMESTAMP is stored in the connector state but the 
timestamp itself isn't stored in the state. If a stream is idle and the 
init.pos is changed to TRIM_HORIZON then the connector attempts to read from 
AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp 
from the properties however it is no longer there as the init.pos property is 
not TRIM_HORIZON.

Sample error

 
{code:java}
java.lang.IllegalArgumentException: java.lang.NullPointerException
    at 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579)
    at 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592)
    at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.lang.NullPointerException
    at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470)
    at java.base/java.text.DateFormat.parse(DateFormat.java:393)
    at 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577)
    ... 8 more

{code}

  was:
The Init.pos is partially stored in connector state which can lead to 
inconsistencies further down the line in idle streams. In particularly an issue 
arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to 
TRIM_HORIZON.

The issue is that AT_TIMESTAMP is stored in the connector state but the 
timestamp itself isn't stored in the state. If a stream is idle and the 
init.pos is changed to TRIM_HORIZON then the connector attempts to read from 
AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp 
from the properties however it is no longer there as the init.pos property is 
not TRIM_HORIZON.

 


> Init.pos is partially stored in connector state
> -----------------------------------------------
>
>                 Key: FLINK-32394
>                 URL: https://issues.apache.org/jira/browse/FLINK-32394
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.17.1
>            Reporter: Usamah Jassat
>            Priority: Minor
>
> The Init.pos is partially stored in connector state which can lead to 
> inconsistencies further down the line in idle streams. In particularly an 
> issue arises when the init.pos is AT_TIMESTAMP, and the init.pos is later 
> changed to TRIM_HORIZON.
> The issue is that AT_TIMESTAMP is stored in the connector state but the 
> timestamp itself isn't stored in the state. If a stream is idle and the 
> init.pos is changed to TRIM_HORIZON then the connector attempts to read from 
> AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp 
> from the properties however it is no longer there as the init.pos property is 
> not TRIM_HORIZON.
> Sample error
>  
> {code:java}
> java.lang.IllegalArgumentException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579)
>     at 
> org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465)
>     at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592)
>     at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: java.lang.NullPointerException
>     at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470)
>     at java.base/java.text.DateFormat.parse(DateFormat.java:393)
>     at 
> org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577)
>     ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to