Hello Tathagata, Thank you for responding. I have read your excellent article on Zero Data Loss many many times.
The Spark Streaming screen shows KCL consistently pulling events from the stream after half a minute as per usual which gets queued up. It's always the first two batches (0 events each) that take 2-10 minutes to process in the transform stage. When I drill down to the task, I see that during the transform stage, a few out of the 25 tasks will take minutes to complete while the rest take 1-3secs. The executor task appears to be hung on this thread: java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) sun.security.ssl.AppInputStream.read(AppInputStream.java:105) org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198) org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176) com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408) com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184) com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165) com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279) com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129) com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123) com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185) com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58) com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31) com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93) com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43) com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50) com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31) com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106) com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42) com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2490) com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1142) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:258) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:213) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:199) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:155) org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:127) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:88) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Sincerely, Heji On Tue, Nov 3, 2015 at 3:14 AM, Tathagata Das <t...@databricks.com> wrote: > The Kinesis integration underneath uses the KCL libraries which takes a > minute or so sometimes to spin up the threads and start getting data from > Kinesis. That is under normal conditions. In your case, it could be > happening that because of your killing and restarting, the restarted KCL > may be taking a while to get new lease and start getting data again. > > On Mon, Nov 2, 2015 at 11:26 AM, Hster Geguri < > hster.investiga...@gmail.com> wrote: > >> Hello Wonderful Sparks Peoples, >> >> We are testing AWS Kinesis/Spark Streaming (1.5) failover behavior with >> Hadoop/Yarn 2.6 and 2.71 and want to understand expected behavior. >> >> When I manually kill a yarn application master/driver with a linux kill >> -9, YARN will automatically relaunch another master that successfully reads >> in the previous checkpoint. >> >> However- more than half the time, the kinesis executors (5 second >> batches) don't continue processing immediately. I.e. batches of 0 events >> are queued for 5-9 minutes before it starts reprocessing the stream again. >> When I drill down to the current job which is hanging- it shows all >> stages/tasks are complete. I would expect the automatically relaunched >> behavior to be similar to as if I had manually done a resubmit with >> spark-submit where the stream processing continues within a minute of >> launch. >> >> Any input is highly appreciated. >> >> Thanks much, >> Heji >> > >