[ https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897012#comment-15897012 ]
Takeshi Yamamuro commented on SPARK-19364: ------------------------------------------ Do you have any way to reproduce this issue? I couldn't track your scenario. BTW, "input-X-XXX"s are stream blocks, so you cannot remove them via ContextCleaner. These stream blocks are periodically removed in DStream#clearMetadata inside: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L463 > Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are > enabled and an exception is thrown > ---------------------------------------------------------------------------------------------------------- > > Key: SPARK-19364 > URL: https://issues.apache.org/jira/browse/SPARK-19364 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.2 > Environment: ubuntu unix > spark 2.0.2 > application is java > Reporter: Andrew Milkowski > Priority: Blocker > > -- update --- we found that below situation occurs when we encounter > "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: > Can't update checkpoint - instance doesn't hold the lease for this shard" > https://github.com/awslabs/amazon-kinesis-client/issues/108 > we use s3 directory (and dynamodb) to store checkpoints, but if such occurs > blocks should not get stuck but continue to be evicted gracefully from > memory, obviously kinesis library race condition is a problem onto itself... > -- exception leading to a block not being freed up -- > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException: Caught > shutdown exception, skipping checkpoint. > com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: > Can't update checkpoint - instance doesn't hold the lease for this shard > at > com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120) > at > com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) > at > com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137) > at > com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103) > at > org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117) > at > org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94) > at > org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106) > at > org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29) > running standard kinesis stream ingestion with a java spark app and creating > dstream after running for some time some block streams seem to persist > forever and never cleaned up and this eventually leads to memory depletion on > workers > we even tried cleaning RDD's with the following: > cleaner = ssc.sparkContext().sc().cleaner().get(); > filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() { > @Override > public void call(JavaRDD<String> rdd) throws Exception { > cleaner.doCleanupRDD(rdd.id(), true); > } > }); > despite above blocks do persis still, this can be seen in spark admin UI > for instance > input-0-1485362233945 1 ip-<>:34245 Memory Serialized 1442.5 > KB > above block stays and is never cleaned up -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org