hang correlated to number of shards Re: Checkpointing with Kinesis hangs with socket timeouts when driver is relaunched while transforming on a 0 event batch
Just an update that the kinesis checkpointing works well with orderly and kill -9 driver shutdowns when there is less than 4 shards. We use 20+. I created a case with Amazon support since it is the AWS kinesis getRecords API which is hanging. Regards, Heji On Thu, Nov 12, 2015 at 10:37 AM, Hster Geguri wrote: > Hello everyone, > > We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are > trying to make sure checkpointing works with orderly shutdowns(i.e. yarn > application --kill) and unexpected shutdowns which we simulate with a kill > -9. If there is anyone who has successfully tested failover recently with > Kinesis+YARN, I would appreciate even the confirmation this should work. > > We have a simple driver that does aggregate counts per minute against a > Kinesis stream. We see initial hanging behavior (~2- 10 minutes) in both > relaunches. When we do an "unexpected" shutdown of the application master > with "kill -9" of the jvm process, yarn successfully kills the orphan > executors, launches a new driver with new executors. The logs indicate the > restoring the checkpoint was successful. However, the first two Spark > streaming batches which are of 0 events intermittently hang for anywhere > between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis > getRecords (stack trace at the end of this mail). > > Under normal circumstances, Spark skips the transform and mapToPair stages > on 0 events. However when the executors hang, we notice that the job goes > through the transform stage and tasks hangs in making a getRecord call from > Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed > out" for a Kinesis getRecords call. > > Kinesis as a service should behave more gracefully even if it was fed bad > parameters but why does Spark call getRecords when the batch size is 0 when > relaunching? > > Any input is greatly appreciated as we are stuck on testing failover. > > Heji > > I've put the stack trace below: > > [2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed > out (com.amazonaws.http.AmazonHttpClient) > java.net.SocketTimeoutException: Read timed out > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) > at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) > at sun.security.ssl.InputRecord.read(InputRecord.java:532) > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) > at > sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) > at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) > at > org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198) > at > org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) > at > org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) > at > com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) > at > com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176) > at > com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165) > at > com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279) > at > com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129) > at > com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123) > at > com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185) > at > com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58) > at > com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31) > at > com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93) > at > com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43) > at > com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50) > at > com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31) > at > com.am
Checkpointing with Kinesis hangs with socket timeouts when driver is relaunched while transforming on a 0 event batch
Hello everyone, We are testing checkpointing against YARN 2.7.1 with Spark 1.5. We are trying to make sure checkpointing works with orderly shutdowns(i.e. yarn application --kill) and unexpected shutdowns which we simulate with a kill -9. If there is anyone who has successfully tested failover recently with Kinesis+YARN, I would appreciate even the confirmation this should work. We have a simple driver that does aggregate counts per minute against a Kinesis stream. We see initial hanging behavior (~2- 10 minutes) in both relaunches. When we do an "unexpected" shutdown of the application master with "kill -9" of the jvm process, yarn successfully kills the orphan executors, launches a new driver with new executors. The logs indicate the restoring the checkpoint was successful. However, the first two Spark streaming batches which are of 0 events intermittently hang for anywhere between 2-10+ minutes with a SocketTimeoutException while doing a Kinesis getRecords (stack trace at the end of this mail). Under normal circumstances, Spark skips the transform and mapToPair stages on 0 events. However when the executors hang, we notice that the job goes through the transform stage and tasks hangs in making a getRecord call from Kinesis for 2 minutes before emitting a "SocketTimeoutException: Read timed out" for a Kinesis getRecords call. Kinesis as a service should behave more gracefully even if it was fed bad parameters but why does Spark call getRecords when the batch size is 0 when relaunching? Any input is greatly appreciated as we are stuck on testing failover. Heji I've put the stack trace below: [2015-11-09 15:20:23,478] INFO Unable to execute HTTP request: Read timed out (com.amazonaws.http.AmazonHttpClient) java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) at sun.security.ssl.InputRecord.read(InputRecord.java:532) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198) at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:176) at com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:408) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2184) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2165) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:279) at com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129) at com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123) at com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185) at com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:58) at com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31) at com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93) at com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43) at com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50) at com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31) at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106) at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42) at com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:31
Re: Checkpointing with Kinesis
FYI re WAL on S3 http://search-hadoop.com/m/q3RTtFMpd41A7TnH/WAL+S3&subj=WAL+on+S3 On 18 September 2015 at 13:32, Alan Dipert wrote: > Hello, > > Thanks all for considering our problem. We are doing transformations in > Spark Streaming. We have also since learned that WAL to S3 on 1.4 is "not > reliable" [1] > > We are just going to wait for EMR to support 1.5 and hopefully this won't > be a problem anymore [2]. > > Alan > > 1. > https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E > 2. https://issues.apache.org/jira/browse/SPARK-9215 > > On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath > wrote: > >> Are you doing actual transformations / aggregation in Spark Streaming? Or >> just using it to bulk write to S3? >> >> If the latter, then you could just use your AWS Lambda function to read >> directly from the Kinesis stream. If the former, then perhaps either look >> into the WAL option that Aniket mentioned, or perhaps you could write the >> processed RDD back to Kinesis, and have the Lambda function read the >> Kinesis stream and write to Redshift? >> >> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert wrote: >> >>> Hello, >>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from >>> Kinesis. Our Spark program saves RDDs to S3, after which the records are >>> picked up by a Lambda function that loads them into Redshift. That no data >>> is lost during processing is important to us. >>> >>> We have set our Kinesis checkpoint interval to 15 minutes, which is also >>> our window size. >>> >>> Unfortunately, checkpointing happens after receiving data from Kinesis, >>> not after we have successfully written to S3. If batches back up in Spark, >>> and the cluster is terminated, whatever data was in-memory will be lost >>> because it was checkpointed but not actually saved to S3. >>> >>> We are considering forking and modifying the kinesis-asl library with >>> changes that would allow us to perform the checkpoint manually and at the >>> right time. We'd rather not do this. >>> >>> Are we overlooking an easier way to deal with this problem? Thank you >>> in advance for your insight! >>> >>> Alan >>> >> >> >
Re: Checkpointing with Kinesis
Hello, Thanks all for considering our problem. We are doing transformations in Spark Streaming. We have also since learned that WAL to S3 on 1.4 is "not reliable" [1] We are just going to wait for EMR to support 1.5 and hopefully this won't be a problem anymore [2]. Alan 1. https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E 2. https://issues.apache.org/jira/browse/SPARK-9215 On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath wrote: > Are you doing actual transformations / aggregation in Spark Streaming? Or > just using it to bulk write to S3? > > If the latter, then you could just use your AWS Lambda function to read > directly from the Kinesis stream. If the former, then perhaps either look > into the WAL option that Aniket mentioned, or perhaps you could write the > processed RDD back to Kinesis, and have the Lambda function read the > Kinesis stream and write to Redshift? > > On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert wrote: > >> Hello, >> We are using Spark Streaming 1.4.1 in AWS EMR to process records from >> Kinesis. Our Spark program saves RDDs to S3, after which the records are >> picked up by a Lambda function that loads them into Redshift. That no data >> is lost during processing is important to us. >> >> We have set our Kinesis checkpoint interval to 15 minutes, which is also >> our window size. >> >> Unfortunately, checkpointing happens after receiving data from Kinesis, >> not after we have successfully written to S3. If batches back up in Spark, >> and the cluster is terminated, whatever data was in-memory will be lost >> because it was checkpointed but not actually saved to S3. >> >> We are considering forking and modifying the kinesis-asl library with >> changes that would allow us to perform the checkpoint manually and at the >> right time. We'd rather not do this. >> >> Are we overlooking an easier way to deal with this problem? Thank you in >> advance for your insight! >> >> Alan >> > >
Re: Checkpointing with Kinesis
Are you doing actual transformations / aggregation in Spark Streaming? Or just using it to bulk write to S3? If the latter, then you could just use your AWS Lambda function to read directly from the Kinesis stream. If the former, then perhaps either look into the WAL option that Aniket mentioned, or perhaps you could write the processed RDD back to Kinesis, and have the Lambda function read the Kinesis stream and write to Redshift? On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert wrote: > Hello, > We are using Spark Streaming 1.4.1 in AWS EMR to process records from > Kinesis. Our Spark program saves RDDs to S3, after which the records are > picked up by a Lambda function that loads them into Redshift. That no data > is lost during processing is important to us. > > We have set our Kinesis checkpoint interval to 15 minutes, which is also > our window size. > > Unfortunately, checkpointing happens after receiving data from Kinesis, > not after we have successfully written to S3. If batches back up in Spark, > and the cluster is terminated, whatever data was in-memory will be lost > because it was checkpointed but not actually saved to S3. > > We are considering forking and modifying the kinesis-asl library with > changes that would allow us to perform the checkpoint manually and at the > right time. We'd rather not do this. > > Are we overlooking an easier way to deal with this problem? Thank you in > advance for your insight! > > Alan >
Re: Checkpointing with Kinesis
You can perhaps setup a WAL that logs to S3? New cluster should pick the records that weren't processed due previous cluster termination. Thanks, Aniket On Thu, Sep 17, 2015, 9:19 PM Alan Dipert wrote: > Hello, > We are using Spark Streaming 1.4.1 in AWS EMR to process records from > Kinesis. Our Spark program saves RDDs to S3, after which the records are > picked up by a Lambda function that loads them into Redshift. That no data > is lost during processing is important to us. > > We have set our Kinesis checkpoint interval to 15 minutes, which is also > our window size. > > Unfortunately, checkpointing happens after receiving data from Kinesis, > not after we have successfully written to S3. If batches back up in Spark, > and the cluster is terminated, whatever data was in-memory will be lost > because it was checkpointed but not actually saved to S3. > > We are considering forking and modifying the kinesis-asl library with > changes that would allow us to perform the checkpoint manually and at the > right time. We'd rather not do this. > > Are we overlooking an easier way to deal with this problem? Thank you in > advance for your insight! > > Alan >
Checkpointing with Kinesis
Hello, We are using Spark Streaming 1.4.1 in AWS EMR to process records from Kinesis. Our Spark program saves RDDs to S3, after which the records are picked up by a Lambda function that loads them into Redshift. That no data is lost during processing is important to us. We have set our Kinesis checkpoint interval to 15 minutes, which is also our window size. Unfortunately, checkpointing happens after receiving data from Kinesis, not after we have successfully written to S3. If batches back up in Spark, and the cluster is terminated, whatever data was in-memory will be lost because it was checkpointed but not actually saved to S3. We are considering forking and modifying the kinesis-asl library with changes that would allow us to perform the checkpoint manually and at the right time. We'd rather not do this. Are we overlooking an easier way to deal with this problem? Thank you in advance for your insight! Alan