[ 
https://issues.apache.org/jira/browse/SPARK-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903778#comment-15903778
 ] 

Andrew Milkowski edited comment on SPARK-19364 at 3/9/17 8:17 PM:
------------------------------------------------------------------

thanks @Takeshi Yamamuro , will try to see if I can make this error consistent 
(we see it in prod non stop and it is consistent) I will see if I can throw the 
exception from in the kinesis receiver (java lib) and see stream blocks grow in 
spark, will provide line change to re-produce problem.. it is tied to kinesis 
java lib faulting on checkpoint throwing exception and spark persisting stream 
blocks and never releasing em from memory till eventual OME


was (Author: amilkowski):
thanks @Takeshi Yamamuro , will try to see if I can make this error consistent 
(we see it in prod non stop and it is consistent) I will see if I can throw the 
exception from in the kinesis receiver (java lib) and see stream blocks grow in 
spark, will provide line change to re-produce problem.. it is tired to kinesis 
java lib faulting on checkpoint throwing exception and spark persisting stream 
blocks and never releasing em from memory till eventual OME

> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are 
> enabled and an exception is thrown 
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19364
>                 URL: https://issues.apache.org/jira/browse/SPARK-19364
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: ubuntu unix
> spark 2.0.2
> application is java
>            Reporter: Andrew Milkowski
>            Priority: Blocker
>
> -- update --- we found that below situation occurs when we encounter
> "com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard"
> https://github.com/awslabs/amazon-kinesis-client/issues/108
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs 
> blocks should not get stuck but continue to be evicted gracefully from 
> memory, obviously kinesis library race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught 
> shutdown exception, skipping checkpoint.
> com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: 
> Can't update checkpoint - instance doesn't hold the lease for this shard
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
>       at 
> com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
>       at scala.util.Try$.apply(Try.scala:192)
>       at 
> org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
>       at 
> org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
>       at 
> org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
>       at 
> org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
>       at 
> org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
> running standard kinesis stream ingestion with a java spark app and creating 
> dstream after running for some time some block streams seem to persist 
> forever and never cleaned up and this eventually leads to memory depletion on 
> workers
> we even tried cleaning RDD's with the following:
> cleaner = ssc.sparkContext().sc().cleaner().get();
>         filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
>             @Override
>             public void call(JavaRDD<String> rdd) throws Exception {
>                cleaner.doCleanupRDD(rdd.id(), true);
>             }
>         });
> despite above blocks do persis still, this can be seen in spark admin UI
> for instance
> input-0-1485362233945 1       ip-<>:34245     Memory Serialized       1442.5 
> KB
> above block stays and is never cleaned up



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to