[ 
https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Milkowski updated SPARK-19364:
-------------------------------------
    Description: 
-- update --- we found that below situation occurs when we encounter

"com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
Can't update checkpoint - instance doesn't hold the lease for this shard"

https://github.com/awslabs/amazon-kinesis-client/issues/108

we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
blocks should not get stuck but continue to be evicted gracefully from memory, 
obviously kinesis library race condition is a problem onto itself...

-- exception leading to a block not being freed up --

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught 
shutdown exception, skipping checkpoint.
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
Can't update checkpoint - instance doesn't hold the lease for this shard
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
        at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
        at scala.util.Try$.apply(Try.scala:192)
        at 
org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
        at 
org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
        at 
org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
        at 
org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
        at 
org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)

running standard kinesis stream ingestion with a java spark app and creating 
dstream after running for some time some block streams seem to persist forever 
and never cleaned up and this eventually leads to memory depletion on workers

we even tried cleaning RDD's with the following:

cleaner = ssc.sparkContext().sc().cleaner().get();

        filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> rdd) throws Exception {
               cleaner.doCleanupRDD(rdd.id(), true);
            }
        });

despite above blocks do persis still, this can be seen in spark admin UI

for instance

input-0-1485362233945   1       ip-<>:34245     Memory Serialized       1442.5 
KB

above block stays and is never cleaned up

  was:
-- update --- we found that below situation occurs when we encounter

"com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
Can't update checkpoint - instance doesn't hold the lease for this shard"

https://github.com/awslabs/amazon-kinesis-client/issues/108

we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
blocks should not get stuck but continue to be evicted gracefully from memory, 
obviously kinesis library race condition is a problem onto itself...

running standard kinesis stream ingestion with a java spark app and creating 
dstream after running for some time some block streams seem to persist forever 
and never cleaned up and this eventually leads to memory depletion on workers

we even tried cleaning RDD's with the following:

cleaner = ssc.sparkContext().sc().cleaner().get();

        filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> rdd) throws Exception {
               cleaner.doCleanupRDD(rdd.id(), true);
            }
        });

despite above blocks do persis still, this can be seen in spark admin UI

for instance

input-0-1485362233945   1       ip-<>:34245     Memory Serialized       1442.5 
KB

above block stays and is never cleaned up


> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are 
> enabled and an exception is thrown 
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19364
>                 URL: https://issues.apache.org/jira/browse/SPARK-19364
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.2
>         Environment: ubuntu unix
> spark 2.0.2
> application is java
>            Reporter: Andrew Milkowski
>
> -- update --- we found that below situation occurs when we encounter
> "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard"
> https://github.com/awslabs/amazon-kinesis-client/issues/108
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
> blocks should not get stuck but continue to be evicted gracefully from 
> memory, obviously kinesis library race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught 
> shutdown exception, skipping checkpoint.
> com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
>       at 
> org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
>       at 
> org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
>       at 
> org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> running standard kinesis stream ingestion with a java spark app and creating 
> dstream after running for some time some block streams seem to persist 
> forever and never cleaned up and this eventually leads to memory depletion on 
> workers
> we even tried cleaning RDD's with the following:
> cleaner = ssc.sparkContext().sc().cleaner().get();
>         filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
>             @Override
>             public void call(JavaRDD<String> rdd) throws Exception {
>                cleaner.doCleanupRDD(rdd.id(), true);
>             }
>         });
> despite above blocks do persis still, this can be seen in spark admin UI
> for instance
> input-0-1485362233945 1       ip-<>:34245     Memory Serialized       1442.5 
> KB
> above block stays and is never cleaned up



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to