hang correlated to number of shards Re: Checkpointing with Kinesis hangs with socket timeouts when driver is relaunched while transforming on a 0 event batch

2015-11-13 Thread Hster Geguri
Just an update that the kinesis checkpointing works well with orderly and
kill -9 driver shutdowns when there is less than 4 shards.  We use 20+.

I created a case with Amazon support since it is the AWS kinesis getRecords
API which is hanging.

Regards,
Heji

On Thu, Nov 12, 2015 at 10:37 AM, Hster Geguri  wrote:

> Hello everyone,
>
> We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are
> trying to make sure checkpointing works with orderly shutdowns(i.e. yarn
> application --kill) and unexpected shutdowns which we simulate with a kill
> -9.  If there is anyone who has successfully tested failover recently with
> Kinesis+YARN, I would appreciate even the confirmation this should work.
>
> We have a simple driver that does aggregate counts per minute against a
> Kinesis stream.  We see initial hanging behavior (~2- 10 minutes) in both
> relaunches.  When we do an "unexpected" shutdown of the application master
> with "kill -9" of the jvm process, yarn successfully kills the orphan
> executors, launches a new driver with new executors. The logs indicate the
> restoring the checkpoint was successful.  However, the first two Spark
> streaming batches which are of 0 events intermittently hang for anywhere
> between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis
> getRecords (stack trace at the end of this mail).
>
> Under normal circumstances, Spark skips the transform and mapToPair stages
> on 0 events. However when the executors hang, we notice that the job goes
> through the transform stage and tasks hangs in making a getRecord call from
> Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed
> out" for a Kinesis getRecords call.
>
> Kinesis  as a service should behave more gracefully even if it was fed bad
> parameters but why does Spark call getRecords when the batch size is 0 when
> relaunching?
>
> Any input is greatly appreciated as we are stuck on testing failover.
>
> Heji
>
> I've put the stack trace below:
>
> [2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed
> out (com.amazonaws.http.AmazonHttpClient)
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
> at sun.security.ssl.InputRecord.read(InputRecord.java:532)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
> at
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
> at
> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
> at
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
> at
> com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
> at
> com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176)
> at
> com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165)
> at
> com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279)
> at
> com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)
> at
> com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)
> at
> com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)
> at
> com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58)
> at
> com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)
> at
> com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)
> at
> com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)
> at
> com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)
> at
> com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)
> at
> com.am

Checkpointing with Kinesis hangs with socket timeouts when driver is relaunched while transforming on a 0 event batch

2015-11-12 Thread Hster Geguri
Hello everyone,

We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are
trying to make sure checkpointing works with orderly shutdowns(i.e. yarn
application --kill) and unexpected shutdowns which we simulate with a kill
-9.  If there is anyone who has successfully tested failover recently with
Kinesis+YARN, I would appreciate even the confirmation this should work.

We have a simple driver that does aggregate counts per minute against a
Kinesis stream.  We see initial hanging behavior (~2- 10 minutes) in both
relaunches.  When we do an "unexpected" shutdown of the application master
with "kill -9" of the jvm process, yarn successfully kills the orphan
executors, launches a new driver with new executors. The logs indicate the
restoring the checkpoint was successful.  However, the first two Spark
streaming batches which are of 0 events intermittently hang for anywhere
between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis
getRecords (stack trace at the end of this mail).

Under normal circumstances, Spark skips the transform and mapToPair stages
on 0 events. However when the executors hang, we notice that the job goes
through the transform stage and tasks hangs in making a getRecord call from
Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed
out" for a Kinesis getRecords call.

Kinesis  as a service should behave more gracefully even if it was fed bad
parameters but why does Spark call getRecords when the batch size is 0 when
relaunching?

Any input is greatly appreciated as we are stuck on testing failover.

Heji

I've put the stack trace below:

[2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed
out (com.amazonaws.http.AmazonHttpClient)
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
at sun.security.ssl.InputRecord.read(InputRecord.java:532)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at
org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176)
at
com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165)
at
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279)
at
com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)
at
com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)
at
com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)
at
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58)
at
com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)
at
com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)
at
com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)
at
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)
at
com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)
at
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106)
at
com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)
at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:31

Re: Checkpointing with Kinesis

2015-09-18 Thread Michal Čizmazia
FYI re WAL on S3

http://search-hadoop.com/m/q3RTtFMpd41A7TnH/WAL+S3&subj=WAL+on+S3



On 18 September 2015 at 13:32, Alan Dipert  wrote:

> Hello,
>
> Thanks all for considering our problem.  We are doing transformations in
> Spark Streaming.  We have also since learned that WAL to S3 on 1.4 is "not
> reliable" [1]
>
> We are just going to wait for EMR to support 1.5 and hopefully this won't
> be a problem anymore [2].
>
> Alan
>
> 1.
> https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E
> 2. https://issues.apache.org/jira/browse/SPARK-9215
>
> On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath 
> wrote:
>
>> Are you doing actual transformations / aggregation in Spark Streaming? Or
>> just using it to bulk write to S3?
>>
>> If the latter, then you could just use your AWS Lambda function to read
>> directly from the Kinesis stream. If the former, then perhaps either look
>> into the WAL option that Aniket mentioned, or perhaps you could write the
>> processed RDD back to Kinesis, and have the Lambda function read the
>> Kinesis stream and write to Redshift?
>>
>> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:
>>
>>> Hello,
>>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
>>> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
>>> picked up by a Lambda function that loads them into Redshift.  That no data
>>> is lost during processing is important to us.
>>>
>>> We have set our Kinesis checkpoint interval to 15 minutes, which is also
>>> our window size.
>>>
>>> Unfortunately, checkpointing happens after receiving data from Kinesis,
>>> not after we have successfully written to S3.  If batches back up in Spark,
>>> and the cluster is terminated, whatever data was in-memory will be lost
>>> because it was checkpointed but not actually saved to S3.
>>>
>>> We are considering forking and modifying the kinesis-asl library with
>>> changes that would allow us to perform the checkpoint manually and at the
>>> right time.  We'd rather not do this.
>>>
>>> Are we overlooking an easier way to deal with this problem?  Thank you
>>> in advance for your insight!
>>>
>>> Alan
>>>
>>
>>
>


Re: Checkpointing with Kinesis

2015-09-18 Thread Alan Dipert
Hello,

Thanks all for considering our problem.  We are doing transformations in
Spark Streaming.  We have also since learned that WAL to S3 on 1.4 is "not
reliable" [1]

We are just going to wait for EMR to support 1.5 and hopefully this won't
be a problem anymore [2].

Alan

1.
https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E
2. https://issues.apache.org/jira/browse/SPARK-9215

On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath 
wrote:

> Are you doing actual transformations / aggregation in Spark Streaming? Or
> just using it to bulk write to S3?
>
> If the latter, then you could just use your AWS Lambda function to read
> directly from the Kinesis stream. If the former, then perhaps either look
> into the WAL option that Aniket mentioned, or perhaps you could write the
> processed RDD back to Kinesis, and have the Lambda function read the
> Kinesis stream and write to Redshift?
>
> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:
>
>> Hello,
>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
>> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
>> picked up by a Lambda function that loads them into Redshift.  That no data
>> is lost during processing is important to us.
>>
>> We have set our Kinesis checkpoint interval to 15 minutes, which is also
>> our window size.
>>
>> Unfortunately, checkpointing happens after receiving data from Kinesis,
>> not after we have successfully written to S3.  If batches back up in Spark,
>> and the cluster is terminated, whatever data was in-memory will be lost
>> because it was checkpointed but not actually saved to S3.
>>
>> We are considering forking and modifying the kinesis-asl library with
>> changes that would allow us to perform the checkpoint manually and at the
>> right time.  We'd rather not do this.
>>
>> Are we overlooking an easier way to deal with this problem?  Thank you in
>> advance for your insight!
>>
>> Alan
>>
>
>


Re: Checkpointing with Kinesis

2015-09-18 Thread Nick Pentreath
Are you doing actual transformations / aggregation in Spark Streaming? Or
just using it to bulk write to S3?

If the latter, then you could just use your AWS Lambda function to read
directly from the Kinesis stream. If the former, then perhaps either look
into the WAL option that Aniket mentioned, or perhaps you could write the
processed RDD back to Kinesis, and have the Lambda function read the
Kinesis stream and write to Redshift?

On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:

> Hello,
> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
> picked up by a Lambda function that loads them into Redshift.  That no data
> is lost during processing is important to us.
>
> We have set our Kinesis checkpoint interval to 15 minutes, which is also
> our window size.
>
> Unfortunately, checkpointing happens after receiving data from Kinesis,
> not after we have successfully written to S3.  If batches back up in Spark,
> and the cluster is terminated, whatever data was in-memory will be lost
> because it was checkpointed but not actually saved to S3.
>
> We are considering forking and modifying the kinesis-asl library with
> changes that would allow us to perform the checkpoint manually and at the
> right time.  We'd rather not do this.
>
> Are we overlooking an easier way to deal with this problem?  Thank you in
> advance for your insight!
>
> Alan
>


Re: Checkpointing with Kinesis

2015-09-17 Thread Aniket Bhatnagar
You can perhaps setup a WAL that logs to S3? New cluster should pick the
records that weren't processed due previous cluster termination.

Thanks,
Aniket

On Thu, Sep 17, 2015, 9:19 PM Alan Dipert  wrote:

> Hello,
> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
> picked up by a Lambda function that loads them into Redshift.  That no data
> is lost during processing is important to us.
>
> We have set our Kinesis checkpoint interval to 15 minutes, which is also
> our window size.
>
> Unfortunately, checkpointing happens after receiving data from Kinesis,
> not after we have successfully written to S3.  If batches back up in Spark,
> and the cluster is terminated, whatever data was in-memory will be lost
> because it was checkpointed but not actually saved to S3.
>
> We are considering forking and modifying the kinesis-asl library with
> changes that would allow us to perform the checkpoint manually and at the
> right time.  We'd rather not do this.
>
> Are we overlooking an easier way to deal with this problem?  Thank you in
> advance for your insight!
>
> Alan
>


Checkpointing with Kinesis

2015-09-17 Thread Alan Dipert
Hello,
We are using Spark Streaming 1.4.1 in AWS EMR to process records from
Kinesis.  Our Spark program saves RDDs to S3, after which the records are
picked up by a Lambda function that loads them into Redshift.  That no data
is lost during processing is important to us.

We have set our Kinesis checkpoint interval to 15 minutes, which is also
our window size.

Unfortunately, checkpointing happens after receiving data from Kinesis, not
after we have successfully written to S3.  If batches back up in Spark, and
the cluster is terminated, whatever data was in-memory will be lost because
it was checkpointed but not actually saved to S3.

We are considering forking and modifying the kinesis-asl library with
changes that would allow us to perform the checkpoint manually and at the
right time.  We'd rather not do this.

Are we overlooking an easier way to deal with this problem?  Thank you in
advance for your insight!

Alan