Thanks Kabeer. That helps understand what you are trying.  This should be
easy to achieve. Happy to work through some tuning/patches as we go.
I have started looking into the this now. Will update
https://issues.apache.org/jira/browse/HUDI-81 with all details as I find
them.

Two things, if you can clarify

1) The reason why I asked if this is Streaming job is coz we have been
cleaning for more than 2 yrs now from spark jobs run every 30 minutes. Or
may be you are doing this from some notebook (which is also long running)?
What's makes your spark job not exit?

2) Also folks from Vungle have run ingest on gcs in streaming mode, and did
not spot any issues. So makes me wonder if this is a s3 issue. Can you
share how you are monitoring the number of connections to S3? (This helps
me first repro the issue)

Thanks
Vinoth






On Mon, Mar 25, 2019 at 6:01 PM Kabeer Ahmed <[email protected]> wrote:

> Hi Vinoth,
>
> *Context*
>
> The application I wrote is generic to tackle loads of csv or other data
> files lying around in the organizations. Take the case of a typical stock
> trading information contained in these files. The files have say
> customer_id as primary key and timestamp of transaction. As one can
> imagine, the issue with loading such files is quite evident. There are 2
> options:
>
>    1. Developer ensures that files are uploaded in order of timeline OR
>    2. Write intelligent code in HUDI that can reject outdated updates of
>    records.
>
> I have taken the second approach by implementing the routine below to my
> requirement.
>
> // Routine to implement
> public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, 
> Schema schema)
>
> This also enables us to not worry about data corruption if application
> crashes and files reload process has to be restarted.
>
> *Application*
>
> The first cut of the application was written in such a way that Spark read
> a folder of files and then processed each file separately. After processing
> 30 files S3 connection limit would breach and I thought I will get away
> with it if I implemented one spark-submit per file. This did help to
> process files until 2 years worth of daily partitioned data was loaded.
>
> Each file is split and 2 distinct data frames are written in 2 distinct
> tables. After 2+ years of processed data, each of df.write would create
> 750+ connections and breach 1500 S3 connections limit. This literally made
> HUDI impossible to use for any dataset that could grow to a timeline of 3
> years.
>
> I have now increased the S3 connection limit to 5000 with a view of
> finding root cause and fixing it with your help. This is just a time buying
> exercise but this is not something that will scale.
>
> When I said 3 levels down - the table is partitioned by date eg:
> 2019/03/01 under the base path. Each call to such partitions, will result
> in distinct file i.e. creates a single S3 connection.
>
> *Further Steps*
>
> I think it should be possible to replicate this scenario in Cloudera HDFS
> cluster. I have put a snapshot of one such chart from the Cloudera library
> that can be monitored before during and after closing application to see
> how many HDFS connections are being opened.
>
> Thank you for all the responses.
> Kabeer.
>
>
> On Mar 25 2019, at 11:43 pm, Vinoth Chandar <[email protected]> wrote:
>
> Hi Kabeer,
>
> Actually knowing things like S3 creates connections three level down helps
> us immensely already! :) We typically test with HDFS which just talks to
> namenode for RPC.
>
> For context, are you running hudi in a streaming job that does not exit?
>
> Thanks
> Vinoth
>
> On Mon, Mar 25, 2019 at 2:58 PM Kabeer Ahmed <[email protected]> wrote:
>
> Thank you Vinoth and Balaji. As soon as you have a patch, I can get the
> tests going on S3 and relay back the results.
>
> Vinoth:
> My proficiency with code can never be any better than you and Balaji. I
> have never looked into the translation of fs calls (getPartitions()) into
> depth if they are translated into filesystem calls.
>
> But as far as my understanding wrt S3, even a simple get partition if
> going 3 levels down to retrieve the objects also results in a S3
> connection. Let me give a gist what we do in the pseudo-code below.
>
> insertHudiRecords()
> {
> // Prepare 1st HUDI DF to write
> df1.write.format(com.uber.hudi).... // S3 connections increase by 750
> inline with # of partitions we have.
>
> // Prepare another HUDI DF to write.
> df2.write.format(com.uber.hudi)... // S3 connections are at 750 level &
> another 750 are added UP
>
> }
>
> // S3 connections released only when Spark process in above routine is
> finished i.e. actual application exits()
> Thank you for all your responses.
> On Mar 25 2019, at 4:15 pm, [email protected] wrote:
>
> +1, Incremental cleaning is a scheduled work. I will be working on this
>
> immediately after the HUDI-1
>
> Balaji.V On Sunday, March 24, 2019, 7:42:03 PM PDT, Vinoth Chandar <
>
> [email protected]> wrote:
>
>
> Hi Kabeer,
> You are right. HUDI-1 alone wont be sufficient. We need to do a follow
>
> on.
>
> IIRC this is already planned work (balaji?)
> Filed https://issues.apache.org/jira/browse/HUDI-80 to separate this
>
> from
>
> HUDI-1..
>
> On to the issue you are facing, seems like the connections to S3 keep
> hanging around? Don't think cleaning actually opens any files, simply
>
> lists
>
> and deletes. We could call a fs.close which probably shuts the
>
> connections
>
> down. But, need to think through that more, since fs caching is a tricky
> issue.. https://issues.apache.org/jira/browse/HUDI-81 filed this
> separately to track this. If you can help me track the connections to S3
> etc, I can take a stab and may be we can test teh patch in your
> environment?
>
> We can work on the ticket. Please share your jira id, so I can add you as
> acontributor, giving you commenting etc on jira
>
> Thanks
> Vinoth
>
>
>
> On Sun, Mar 24, 2019 at 2:11 PM Kabeer Ahmed <[email protected]>
>
> wrote:
>
> Hi Vinoth,
> Thank you for your response. I thought of reducing clear parallelism
>
> which
>
> is Min(200, table_partitions). But it wouldnt have an effect as
>
> regardless
>
> of parallelism, there will be an attempt to scan all files (reduced
> parallelism might albeit slow the process).
> So as stated in a table with 750+ partitions I did notice that
>
> connections
>
> would increase and I have now been forced to keep the S3 connection
>
> limit
>
> to 5k due to this issue.
> I also looked into the brief description of the jira:
> https://issues.apache.org/jira/browse/HUDI-1 (
>
>
> https://link.getmailspring.com/link/[email protected]/0?redirect=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FHUDI-1&recipient=ZGV2QGh1ZGkuYXBhY2hlLm9yZw%3D%3D
> ).
>
> This is a very nice optimisation to have but I dont think it will help
> alleviate the concerns on the S3. On HDFS, this jira will definitely
>
> help
>
> reduce the # of name node connections but S3 objects will need to be
>
> opened
>
> to clear them and the problem will no go away.
> I think the effective work has to be on the lines of working up
>
> cleaning
>
> the partitions in the routine below:
>
> // File: HoodieCopyOnWriteTable.java
> public List<HoodieCleanStat> clean(JavaSparkContext jsc) {
>
> try {
> FileSystem fs = getMetaClient().getFs();
> List<String> partitionsToClean = FSUtils
> .getAllPartitionPaths(fs, getMetaClient().getBasePath(),
> config.shouldAssumeDatePartitioning());
> logger.info("Partitions to clean up : " + partitionsToClean + ", with
> policy " + config
> .getCleanerPolicy());
> if (partitionsToClean.isEmpty()) {
> logger.info("Nothing to clean here mom. It is already clean");
> return Collections.emptyList();
> }
> return cleanPartitionPaths(partitionsToClean, jsc);
> } catch (IOException e) {
> throw new HoodieIOException("Failed to clean up after commit", e);
> }
> }
> In the above routine, all the connections opened are not closed. I
>
> think
>
> the work should be on the lines of cleaning the connections in this
>
> routine
>
> after the cleaning operation (i.e. file close logic added so that it is
> executed in parallel for every file opened by Spark executors).
>
> Please feel free to correct me if you think I have goofed up somewhere.
> Thanks
> Kabeer.
>
> PS: There is so much going on and there is a need to progress with the
> stuff at hand at work. Otherwise would have loved to spend time and
>
> send a
>
> PR.
> On Mar 24 2019, at 7:04 am, Vinoth Chandar <[email protected]> wrote:
>
> Hi Kabeer,
>
> No need to apologize :)
> Mailing list works lot better for reporting issues. We can respond
>
> much
>
> quicker, since its not buried with all other github events
>
> On what you saw, the cleaner does list all partitions currently.
>
> Have you
>
> tried reducing cleaner parallelism if limiting connections is your
>
> goal?
>
>
> Also some good news is, once
> https://issues.apache.org/jira/browse/HUDI-1 is landed (currently
>
> being
>
> reviewed), a follow on is to rework the cleaner incrementally on top
>
>
> which
>
> should help a lot here.
>
>
> Thanks
> Vinoth
>
> On Sat, Mar 23, 2019 at 7:39 PM Kabeer Ahmed <[email protected]>
>
> wrote:
>
> Hi,
> I have just raised this issue and thought to share with the
>
> community
>
>
>
> if
>
> someone else is experiencing this. Apologies in advance if this is
>
> a
>
> redundant email.
> Thanks
> Kabeer.
>
> [image: Sent from Mailspring]

Reply via email to