[jira] [Updated] (SPARK-18020) Kinesis receiver does not snapshot when shard completes

2017-01-25 Thread Burak Yavuz (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Burak Yavuz updated SPARK-18020:

Assignee: Takeshi Yamamuro

> Kinesis receiver does not snapshot when shard completes
> ---
>
> Key: SPARK-18020
> URL: https://issues.apache.org/jira/browse/SPARK-18020
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.0
>Reporter: Yonathan Randolph
>Assignee: Takeshi Yamamuro
>Priority: Minor
>  Labels: kinesis
> Fix For: 2.2.0
>
>
> When a kinesis shard is split or combined and the old shard ends, the Amazon 
> Kinesis Client library [calls 
> IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
>  and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
> number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
> spark’s 
> [KinesisRecordProcessor|https://github.com/apache/spark/blob/v2.0.1/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala]
>  sometimes does not checkpoint SHARD_END. This results in an error message, 
> and spark is then blocked indefinitely from processing any items from the 
> child shards.
> This issue has also been raised on StackOverflow: [resharding while spark 
> running on kinesis 
> stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]
> Exception that is logged:
> {code}
> 16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
> java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
> shard shardId-0030
> at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
> at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
> at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Command used to split shard:
> {code}
> aws kinesis --region us-west-1 split-shard --stream-name my-stream 
> --shard-to-split shardId-0030 --new-starting-hash-key 
> 5316911983139663491615228241121378303
> {code}
> After the spark-streaming job has hung, examining the DynamoDB table 
> indicates that the parent shard processor has not reached 
> {{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
> {{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:
> {code}
> aws kinesis --region us-west-1 describe-stream --stream-name my-stream
> {
> "StreamDescription": {
> "RetentionPeriodHours": 24, 
> "StreamName": "my-stream", 
> "Shards": [
> {
> "ShardId": "shardId-0030", 
> "HashKeyRange": {
> "EndingHashKey": 
> "10633823966279326983230456482242756606", 
> "StartingHashKey": "0"
> },
> ...
> }, 
> {
> "ShardId": "shardId-0062", 
> "HashKeyRange": {
> "EndingHashKey": "5316911983139663491615228241121378302", 
> "StartingHashKey": "0"
> }, 
> "ParentShardId": "shardId-0030", 
> "SequenceNumberRange": {
> "StartingSequenceNumber": 
> "49566806087883755242230188435465744452396445937434624994"
> }
> }, 
> {
> "ShardId": "shardId-0063", 
> "HashKeyRange": {
> "EndingHashKey": 
> "10633823966279326983230456482242756606", 
> "StartingHashKey": "5316911983139663491615228241121378303"
> }, 
> "ParentShardId": "shardId-0030", 
> "SequenceNumberRange": {
> "StartingSequenceNumber": 
> "49566806087906055987428719058607280170669094298940605426"
> }
> },
> ...
> ],
> "StreamStatus": "ACTIVE"
> }
> }
> aws dynamodb --region us-west-1 scan --table-name my-processor
> {
> "Items": [
> {
> "leaseOwner": {
> "S": 

[jira] [Updated] (SPARK-18020) Kinesis receiver does not snapshot when shard completes

2016-10-19 Thread Yonathan Randolph (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yonathan Randolph updated SPARK-18020:
--
Description: 
When a kinesis shard is split or combined and the old shard ends, the Amazon 
Kinesis Client library [calls 
IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
 and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
spark’s 
[KinesisRecordProcessor|https://github.com/apache/spark/blob/v2.0.1/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala]
 sometimes does not checkpoint SHARD_END. This results in an error message, and 
spark is then blocked indefinitely from processing any items from the child 
shards.

This issue has also been raised on StackOverflow: [resharding while spark 
running on kinesis 
stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]

Exception that is logged:
{code}
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
shard shardId-0030
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

Command used to split shard:

{code}
aws kinesis --region us-west-1 split-shard --stream-name my-stream 
--shard-to-split shardId-0030 --new-starting-hash-key 
5316911983139663491615228241121378303
{code}

After the spark-streaming job has hung, examining the DynamoDB table indicates 
that the parent shard processor has not reached 
{{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
{{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:

{code}
aws kinesis --region us-west-1 describe-stream --stream-name my-stream
{
"StreamDescription": {
"RetentionPeriodHours": 24, 
"StreamName": "my-stream", 
"Shards": [
{
"ShardId": "shardId-0030", 
"HashKeyRange": {
"EndingHashKey": "10633823966279326983230456482242756606", 
"StartingHashKey": "0"
},
...
}, 
{
"ShardId": "shardId-0062", 
"HashKeyRange": {
"EndingHashKey": "5316911983139663491615228241121378302", 
"StartingHashKey": "0"
}, 
"ParentShardId": "shardId-0030", 
"SequenceNumberRange": {
"StartingSequenceNumber": 
"49566806087883755242230188435465744452396445937434624994"
}
}, 
{
"ShardId": "shardId-0063", 
"HashKeyRange": {
"EndingHashKey": "10633823966279326983230456482242756606", 
"StartingHashKey": "5316911983139663491615228241121378303"
}, 
"ParentShardId": "shardId-0030", 
"SequenceNumberRange": {
"StartingSequenceNumber": 
"49566806087906055987428719058607280170669094298940605426"
}
},
...
],
"StreamStatus": "ACTIVE"
}
}

aws dynamodb --region us-west-1 scan --table-name my-processor
{
"Items": [
{
"leaseOwner": {
"S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
}, 
"leaseCounter": {
"N": "49318"
}, 
"ownerSwitchesSinceCheckpoint": {
"N": "62"
}, 
"checkpointSubSequenceNumber": {
"N": "0"
}, 
"checkpoint": {
"S": "49566573572821264975247582655142547856950135436343247330"
}, 
"parentShardId": {
"SS": [
"shardId-0014"
]
}, 
"leaseKey": {
"S": "shardId-0030"
}
}, 
{

[jira] [Updated] (SPARK-18020) Kinesis receiver does not snapshot when shard completes

2016-10-19 Thread Yonathan Randolph (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yonathan Randolph updated SPARK-18020:
--
Description: 
When a kinesis shard is split or combined and the old shard ends, the Kinesis 
Client Library [calls 
IRecordProcessor.shutdown|https://github.com/awslabs/amazon-kinesis-client/blob/v1.7.0/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java#L100]
 and expects that {{IRecordProcessor.shutdown}} must checkpoint the sequence 
number {{ExtendedSequenceNumber.SHARD_END}} before returning. Unfortunately, 
spark’s KinesisRecordProcessor sometimes does not checkpoint SHARD_END. This 
results in an error message, and spark is then blocked indefinitely from 
processing any items from the child shards.

This issue has also been raised on StackOverflow: [resharding while spark 
running on kinesis 
stream|http://stackoverflow.com/questions/38898691/resharding-while-spark-running-on-kinesis-stream]

Exception that is logged:
{code}
16/10/19 19:37:49 ERROR worker.ShutdownTask: Application exception. 
java.lang.IllegalArgumentException: Application didn't checkpoint at end of 
shard shardId-0030
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:106)
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

Command used to split shard:

{code}
aws kinesis --region us-west-1 split-shard --stream-name my-stream 
--shard-to-split shardId-0030 --new-starting-hash-key 
5316911983139663491615228241121378303
{code}

After the spark-streaming job has hung, examining the DynamoDB table indicates 
that the parent shard processor has not reached 
{{ExtendedSequenceNumber.SHARD_END}} and the child shards are still at 
{{ExtendedSequenceNumber.TRIM_HORIZON}} waiting for the parent to finish:

{code}
aws kinesis --region us-west-1 describe-stream --stream-name my-stream
{
"StreamDescription": {
"RetentionPeriodHours": 24, 
"StreamName": "my-stream", 
"Shards": [
{
"ShardId": "shardId-0030", 
"HashKeyRange": {
"EndingHashKey": "10633823966279326983230456482242756606", 
"StartingHashKey": "0"
},
...
}, 
{
"ShardId": "shardId-0062", 
"HashKeyRange": {
"EndingHashKey": "5316911983139663491615228241121378302", 
"StartingHashKey": "0"
}, 
"ParentShardId": "shardId-0030", 
"SequenceNumberRange": {
"StartingSequenceNumber": 
"49566806087883755242230188435465744452396445937434624994"
}
}, 
{
"ShardId": "shardId-0063", 
"HashKeyRange": {
"EndingHashKey": "10633823966279326983230456482242756606", 
"StartingHashKey": "5316911983139663491615228241121378303"
}, 
"ParentShardId": "shardId-0030", 
"SequenceNumberRange": {
"StartingSequenceNumber": 
"49566806087906055987428719058607280170669094298940605426"
}
},
...
],
"StreamStatus": "ACTIVE"
}
}

aws dynamodb --region us-west-1 scan --table-name my-processor
{
"Items": [
{
"leaseOwner": {
"S": "localhost:fd385c95-5d19-4678-926f-b6d5f5503cbe"
}, 
"leaseCounter": {
"N": "49318"
}, 
"ownerSwitchesSinceCheckpoint": {
"N": "62"
}, 
"checkpointSubSequenceNumber": {
"N": "0"
}, 
"checkpoint": {
"S": "49566573572821264975247582655142547856950135436343247330"
}, 
"parentShardId": {
"SS": [
"shardId-0014"
]
}, 
"leaseKey": {
"S": "shardId-0030"
}
}, 
{
"leaseOwner": {
"S": "localhost:ca44dc83-2580-4bf3-903f-e7ccc8a3ab02"
}, 
"leaseCounter": {
"N": "25439"