Hi Seth,

Thank you for your suggestion.

But if the issue is only related to S3, then why does this happen when I
replace the S3 sink  to HDFS as well (for checkpointing I am using HDFS
only )

Stephan,
Another issue I see is when I set env.setBufferTimeout(-1) , and keep the
checkpoint interval to 10minutes, I have observed that nothing gets written
to sink (tried with S3 as well as HDFS), atleast I was expecting pending
files here.
This issue gets worst when checkpointing is disabled  as nothing is written.



Regards,
Vinay Patil

On Mon, Feb 27, 2017 at 10:55 PM, Stephan Ewen [via Apache Flink User
Mailing List archive.] <ml-node+s2336050n11943...@n4.nabble.com> wrote:

> Hi Seth!
>
> Wow, that is an awesome approach.
>
> We have actually seen these issues as well and we are looking to
> eventually implement our own S3 file system (and circumvent Hadoop's S3
> connector that Flink currently relies on): https://issues.apache.
> org/jira/browse/FLINK-5706
>
> Do you think your patch would be a good starting point for that and would
> you be willing to share it?
>
> The Amazon AWS SDK for Java is Apache 2 licensed, so that is possible to
> fork officially, if necessary...
>
> Greetings,
> Stephan
>
>
>
> On Mon, Feb 27, 2017 at 5:15 PM, Seth Wiesman <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=11943&i=0>> wrote:
>
>> Just wanted to throw in my 2cts.
>>
>>
>>
>> I’ve been running pipelines with similar state size using rocksdb which
>> externalize to S3 and bucket to S3. I was getting stalls like this and
>> ended up tracing the problem to S3 and the bucketing sink. The solution was
>> two fold:
>>
>>
>>
>> 1)       I forked hadoop-aws and have it treat flink as a source of
>> truth. Emr uses a dynamodb table to determine if S3 is inconsistent.
>> Instead I say that if flink believes that a file exists on S3 and we don’t
>> see it then I am going to trust that flink is in a consistent state and S3
>> is not. In this case, various operations will perform a back off and retry
>> up to a certain number of times.
>>
>>
>>
>> 2)       The bucketing sink performs multiple renames over the lifetime
>> of a file, occurring when a checkpoint starts and then again on
>> notification after it completes. Due to S3’s consistency guarantees the
>> second rename of file can never be assured to work and will eventually fail
>> either during or after a checkpoint. Because there is no upper bound on the
>> time it will take for a file on S3 to become consistent, retries cannot
>> solve this specific problem as it could take upwards of many minutes to
>> rename which would stall the entire pipeline. The only viable solution I
>> could find was to write a custom sink which understands S3. Each writer
>> will write file locally and then copy it to S3 on checkpoint. By only
>> interacting with S3 once per file it can circumvent consistency issues all
>> together.
>>
>>
>>
>> Hope this helps,
>>
>>
>>
>> Seth Wiesman
>>
>>
>>
>> *From: *vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11943&i=1>>
>> *Reply-To: *"[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11943&i=2>" <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11943&i=3>>
>> *Date: *Saturday, February 25, 2017 at 10:50 AM
>> *To: *"[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11943&i=4>" <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=11943&i=5>>
>> *Subject: *Re: Checkpointing with RocksDB as statebackend
>>
>>
>>
>> HI Stephan,
>>
>> Just to avoid the confusion here, I am using S3 sink for writing the
>> data, and using HDFS for storing checkpoints.
>>
>> There are 2 core nodes (HDFS) and two task nodes on EMR
>>
>>
>> I replaced s3 sink with HDFS for writing data in my last test.
>>
>> Let's say the checkpoint interval is 5 minutes, now within 5minutes of
>> run the state size grows to 30GB ,  after checkpointing the 30GB state that
>> is maintained in rocksDB has to be copied to HDFS, right ?  is this causing
>> the pipeline to stall ?
>>
>>
>> Regards,
>>
>> Vinay Patil
>>
>>
>>
>> On Sat, Feb 25, 2017 at 12:22 AM, Vinay Patil <[hidden email]> wrote:
>>
>> Hi Stephan,
>>
>> To verify if S3 is making teh pipeline stall, I have replaced the S3 sink
>> with HDFS and kept minimum pause between checkpoints to 5minutes, still I
>> see the same issue with checkpoints getting failed.
>>
>> If I keep the  pause time to 20 seconds, all checkpoints are completed ,
>> however there is a hit in overall throughput.
>>
>>
>>
>>
>> Regards,
>>
>> Vinay Patil
>>
>>
>>
>> On Fri, Feb 24, 2017 at 10:09 PM, Stephan Ewen [via Apache Flink User
>> Mailing List archive.] <[hidden email]> wrote:
>>
>> Flink's state backends currently do a good number of "make sure this
>> exists" operations on the file systems. Through Hadoop's S3 filesystem,
>> that translates to S3 bucket list operations, where there is a limit in how
>> many operation may happen per time interval. After that, S3 blocks.
>>
>>
>>
>> It seems that operations that are totally cheap on HDFS are hellishly
>> expensive (and limited) on S3. It may be that you are affected by that.
>>
>>
>>
>> We are gradually trying to improve the behavior there and be more S3
>> aware.
>>
>>
>>
>> Both 1.3-SNAPSHOT and 1.2-SNAPSHOT already contain improvements there.
>>
>>
>>
>> Best,
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Fri, Feb 24, 2017 at 4:42 PM, vinay patil <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=11891&i=0>> wrote:
>>
>> Hi Stephan,
>>
>> So do you mean that S3 is causing the stall , as I have mentioned in my
>> previous mail, I could not see any progress for 16minutes as checkpoints
>> were getting failed continuously.
>>
>>
>>
>> On Feb 24, 2017 8:30 PM, "Stephan Ewen [via Apache Flink User Mailing
>> List archive.]" <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=11887&i=0>> wrote:
>>
>> Hi Vinay!
>>
>>
>>
>> True, the operator state (like Kafka) is currently not asynchronously
>> checkpointed.
>>
>>
>>
>> While it is rather small state, we have seen before that on S3 it can
>> cause trouble, because S3 frequently stalls uploads of even data amounts as
>> low as kilobytes due to its throttling policies.
>>
>>
>>
>> That would be a super important fix to add!
>>
>>
>>
>> Best,
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Fri, Feb 24, 2017 at 2:58 PM, vinay patil <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=11885&i=0>> wrote:
>>
>> Hi,
>>
>> I have attached a snapshot for reference:
>> As you can see all the 3 checkpointins failed , for checkpoint ID 2 and 3
>> it
>> is stuck at the Kafka source after 50%
>> (The data sent till now by Kafka source 1 is 65GB and sent by source 2 is
>> 15GB )
>>
>> Within 10minutes 15M records were processed, and for the next 16minutes
>> the
>> pipeline is stuck , I don't see any progress beyond 15M because of
>> checkpoints getting failed consistently.
>>
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/file/n11882/Checkpointing_Failed.png>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Re-Checkpointing-
>> with-RocksDB-as-statebackend-tp11752p11882.html
>>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>>
>>
>> ------------------------------
>>
>> *If you reply to this email, your message will be added to the discussion
>> below:*
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-
>> tp11752p11885.html
>>
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http://user/SendEmail.jtp?type=node&node=11887&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>>
>> ------------------------------
>>
>> View this message in context: Re: Checkpointing with RocksDB as
>> statebackend
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11887.html>
>>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>>
>>
>>
>> ------------------------------
>>
>> *If you reply to this email, your message will be added to the discussion
>> below:*
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-
>> tp11752p11891.html
>>
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email]
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>> View this message in context: Re: Checkpointing with RocksDB as
>> statebackend
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11913.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-
> Checkpointing-with-RocksDB-as-statebackend-tp11752p11943.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=dmluYXkxOC5wYXRpbEBnbWFpbC5jb218MXwxODExMDE2NjAx>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Checkpointing-with-RocksDB-as-statebackend-tp11752p11948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to