[ 
https://issues.apache.org/jira/browse/MADLIB-1345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869840#comment-16869840
 ] 

Domino Valdano commented on MADLIB-1345:
----------------------------------------

The description of this story seems pretty specific to GD, but another idea I 
think would be worth exploring is using a UDF instead of a UDA so that we don't 
have to worry about passing state around, and so that keras can handle all of 
the batching (ie, we just have 1 image per row for our input to fit, rather 
than making our own buffers.)

Eliminating the need for buffers would avoid there being partial batches when 
keras's minibatch size does not go evenly into the buffer size, making a more 
even workload.

Here is the strategy that I think should work:
 # master {{fit()}} function runs a sql query similar to the current one, 
{{SELECT keras_step(a.independent_var, a.dependent_var, initial_weights) FROM 
input_table as a;}}
 # {{keras_step()}} is defined as a UDF that has {{RETURNS SETOF REAL[]}} in 
its definition. (There is no state, merge, or final function.)
 # {{keras_step()}} runs on each segment on each row of that segment. But 
instead of running {{keras_fit()}} every time it's called, it just stores all 
of the independent and dependent vars in a huge buffer, that can grow to much 
larger than 1GB. This buffer can be saved in SD the first time it runs, and 
then appended to for each additional row. When it reaches the last row (which 
we already detect), it runs {{keras_fit()}} on the entire buffer of all images 
on the segment, and keras will handle the minibatching.
 # At the end of the {{keras_step()}} function, it returns one of two things:

{{if (last_row):}}

{{    return [weights]}}

{{ else: }}

{{    return []}}
 # The result of the plpy.execute() function run on master fit() will have N 
rows, where N is the number of segments... because only the last row of each 
segment returns an actual value. These N rows will contain the final weights 
after training on each segment.
 # {{fit()}} averages these N weights together, weighted by how many images 
there are on each segment, and then uses the resulting weights to kick off the 
next iteration, with another call to {{keras_step()}}

This method is simpler in some ways than the UDA, and probably more efficient 
as no state needs to be passed around by Greenplum, and {{keras_fit()}} is only 
called once on each segment so it can handle all batching itself.

This performance improvement is complementary to the GD improvement, so we 
could combine the two.  By using GD, we would avoid the need for the weights to 
be passed the first time. And by using a UDF, we would avoid passing them 
around in the state between each call to {{fit_transition().  Instead, they 
just get set once at the beginning, and returned once after the whole iteration 
is done.}}

> DL: Performance improvement in DL functions
> -------------------------------------------
>
>                 Key: MADLIB-1345
>                 URL: https://issues.apache.org/jira/browse/MADLIB-1345
>             Project: Apache MADlib
>          Issue Type: Improvement
>          Components: Deep Learning
>            Reporter: Ekta Khanna
>            Priority: Major
>             Fix For: v2.0
>
>
> Currently, we pass around model_data, model_arch, etc.. for each buffer/image 
> for fit(), predict() and evaluate(). This causes a lot of overhead and slows 
> down the query considerable.
> We tried to set model_data and model_arch using GD for predict. Following 
> were the runtimes:
> with GD
> ~707 sec(with CPU) - 50K places10_20seg
> without GD
> ~1650 sec(with CPU) - 50K places10_20seg
> Below is the patch for GD changes:
> {code}
> def set_predict_GD(model_architecture, model_data,
>                            is_response, normalizing_const, seg_ids,
>                            images_per_seg, gpus_per_host, segments_per_host,
>                            **kwargs):
>     GD = kwargs['GD']
>     GD['model_architecture'] = model_architecture
>     GD['model_data'] = model_data
>     GD['is_response'] = is_response
>     GD['normalizing_const'] = normalizing_const
>     #GD['current_seg_id'] = current_seg_id
>     GD['seg_ids'] = seg_ids
>     GD['images_per_seg'] = images_per_seg
>     GD['gpus_per_host'] = gpus_per_host
>     GD['segments_per_host'] = segments_per_host
> def predict()
> ....
> set_gd_query=plpy.prepare("""
>            SELECT set_predict_GD
>             ($MAD${model_arch}$MAD$,
>             $1,
>             {is_response},
>             {normalizing_const},
>             -- gp_segment_id,
>             ARRAY{seg_ids_test},
>             ARRAY{images_per_seg_test},
>             {gpus_per_host},
>             {segments_per_host}
>             ) from gp_dist_random('gp_id')
>             """.format(**locals()), ["bytea"]) #Using gp_dist_random('gp_id') 
>  in the query makes the UDF run on each segment
> plpy.execute(set_gd_query, [model_data])
> predict_query = plpy.execute("""
>     CREATE TABLE {output_table} AS
>     SELECT {id_col}, {prediction_select_clause}
>     FROM (
>         SELECT {test_table}.{id_col},
>                ({schema_madlib}.internal_keras_predict
>                    ({independent_varname}, {gp_segment_id_col})
>                ) AS {intermediate_col}
>     FROM {test_table}
>     ) q distributed by ({id_col})
>     """.format(**locals()))
> def internal_keras_predict(independent_var, current_seg_id, **kwargs):
>     start = time.time()
>     SD = kwargs['SD']
>     GD = kwargs['GD']
>     is_response = GD['is_response']
>     normalizing_const = GD['normalizing_const']
>     #current_seg_id = GD['current_seg_id']
>     seg_ids = GD['seg_ids']
>     images_per_seg = GD['images_per_seg']
>     gpus_per_host = GD['gpus_per_host']
>     segments_per_host = GD['segments_per_host']
>     device_name = get_device_name_and_set_cuda_env(gpus_per_host,
>                                                    current_seg_id)
>     ...
> {code}
> With the above changes , we found out that GD is not reliable for GPDB 
> because of the following reasons:
> Consider a single node gpdb cluster with 3 segments
> Calling set_gd using gp_dist_random(), creates 1 process per seg and sets GD 
> on each of these processes.
> seg1 - pid 100 - gd is set here for seg1
> seg2 - pid 200 - gd is set here for seg2
> seg3 - pid300- gd is set here for seg3
> Now, CREATE TABLE.. in predict(), spins up 2 processes per seg, (the old 
> processes where GD was set) + 1 new process per seg.
> seg1 - pid 100 - gd is set here for seg1 (reused from before)
> seg1 - pid 101 - gd is read here for seg1
> seg2 - pid 200 - gd is set here for seg2 (reused from before)
> seg1 - pid 201 - gd is read here for seg2
> seg3 - pid300 - gd is set here for seg3 (reused from before)
> seg1 - pid 301- gd is read here for seg3
> This causes problems because , the processes where GD is read from is not 
> same as the process where it was set.
> Couple of ways to avoid this problem
> # Change predict code to run two plpy execute queries, the first one being 
> the internal predict query and the second one being the create table query.
> # Distribute the source table by the id column and while creating the predict 
> output table use that id column as the distribution key.
> We are not sure if this is good enough for all use cases like what if the 
> source table has an index which might do the same thing as the create table 
> command. Our goal is to avoid the query from creating multiple processes.
> # Explore the GD option
> # Explore alternatives so that we don't have to pass the model data for every 
> row/buffer/image in the transition function/udf



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to