Just an update that the kinesis checkpointing works well with orderly and
kill -9 driver shutdowns when there is less than 4 shards.  We use 20+.

I created a case with Amazon support since it is the AWS kinesis getRecords
API which is hanging.

Regards,
Heji

On Thu, Nov 12, 2015 at 10:37 AM, Hster Geguri <hster.investiga...@gmail.com
> wrote:

> Hello everyone,
>
> We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are
> trying to make sure checkpointing works with orderly shutdowns(i.e. yarn
> application --kill) and unexpected shutdowns which we simulate with a kill
> -9.  If there is anyone who has successfully tested failover recently with
> Kinesis+YARN, I would appreciate even the confirmation this should work.
>
> We have a simple driver that does aggregate counts per minute against a
> Kinesis stream.  We see initial hanging behavior (~2- 10 minutes) in both
> relaunches.  When we do an "unexpected" shutdown of the application master
> with "kill -9" of the jvm process, yarn successfully kills the orphan
> executors, launches a new driver with new executors. The logs indicate the
> restoring the checkpoint was successful.  However, the first two Spark
> streaming batches which are of 0 events intermittently hang for anywhere
> between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis
> getRecords (stack trace at the end of this mail).
>
> Under normal circumstances, Spark skips the transform and mapToPair stages
> on 0 events. However when the executors hang, we notice that the job goes
> through the transform stage and tasks hangs in making a getRecord call from
> Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed
> out" for a Kinesis getRecords call.
>
> Kinesis  as a service should behave more gracefully even if it was fed bad
> parameters but why does Spark call getRecords when the batch size is 0 when
> relaunching?
>
> Any input is greatly appreciated as we are stuck on testing failover.
>
> Heji
>
> I've put the stack trace below:
>
> [2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed
> out (com.amazonaws.http.AmazonHttpClient)
> java.net.SocketTimeoutException: Read timed out
>     at java.net.SocketInputStream.socketRead0(Native Method)
>     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>     at java.net.SocketInputStream.read(SocketInputStream.java:170)
>     at java.net.SocketInputStream.read(SocketInputStream.java:141)
>     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>     at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
>     at sun.security.ssl.InputRecord.read(InputRecord.java:532)
>     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
>     at
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
>     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
>     at
> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
>     at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
>     at
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>     at
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
>     at
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
>     at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176)
>     at
> com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408)
>     at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184)
>     at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165)
>     at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279)
>     at
> com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)
>     at
> com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)
>     at
> com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)
>     at
> com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58)
>     at
> com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)
>     at
> com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)
>     at
> com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)
>     at
> com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)
>     at
> com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)
>     at
> com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106)
>     at
> com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)
>     at
> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>     at
> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>     at
> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>     at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>     at
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2490)
>     at
> com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1142)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:258)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:213)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:199)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:155)
>     at
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:127)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
>     at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203)
>     at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
>

Reply via email to