Hello Tathagata,

Thank you for responding. I have read your excellent article on Zero Data
Loss many many times.

The Spark Streaming screen shows KCL consistently pulling events from the
stream after half a minute as per usual which gets queued up.

It's always the first two batches (0 events each) that take 2-10 minutes to
process in the transform stage. When I drill down to the task, I see that
during the transform stage, a few out of the 25 tasks will take minutes to
complete while the rest take 1-3secs.

The executor task appears to be hung on this thread:

java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
java.net.SocketInputStream.read(SocketInputStream.java:170)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
sun.security.ssl.InputRecord.read(InputRecord.java:532)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176)
com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165)
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279)
com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)
com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)
com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58)
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)
com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)
com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106)
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2490)
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1142)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:214)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:258)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:213)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:199)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:155)
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:127)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:203)
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Sincerely,
Heji



On Tue, Nov 3, 2015 at 3:14 AM, Tathagata Das <t...@databricks.com> wrote:

> The Kinesis integration underneath uses the KCL libraries which takes a
> minute or so sometimes to spin up the threads and start getting data from
> Kinesis. That is under normal conditions. In your case, it could be
> happening that because of your killing and restarting, the restarted KCL
> may be taking a while to get new lease and start getting data again.
>
> On Mon, Nov 2, 2015 at 11:26 AM, Hster Geguri <
> hster.investiga...@gmail.com> wrote:
>
>> Hello Wonderful Sparks Peoples,
>>
>> We are testing AWS Kinesis/Spark Streaming (1.5) failover behavior with
>> Hadoop/Yarn 2.6 and 2.71 and want to understand expected behavior.
>>
>> When I manually kill a yarn application master/driver with a linux kill
>> -9, YARN will automatically relaunch another master that successfully reads
>> in the previous checkpoint.
>>
>> However- more than half the time, the kinesis executors (5 second
>> batches) don't continue processing immediately.  I.e. batches of 0 events
>> are queued for  5-9 minutes before it starts reprocessing the stream again.
>> When I drill down to the current job which is hanging- it shows all
>> stages/tasks are complete. I would expect the automatically relaunched
>> behavior to be similar to as if I had manually done a resubmit with
>> spark-submit where the stream processing continues within a minute of
>> launch.
>>
>> Any input is highly appreciated.
>>
>> Thanks much,
>> Heji
>>
>
>

Reply via email to