kaknikhil commented on a change in pull request #443: DL: Add training for 
multiple models
URL: https://github.com/apache/madlib/pull/443#discussion_r325813503
 
 

 ##########
 File path: src/ports/postgres/modules/deep_learning/madlib_keras.py_in
 ##########
 @@ -387,70 +396,91 @@ def should_compute_metrics_this_iter(curr_iter, 
metrics_compute_frequency,
     return (curr_iter)%metrics_compute_frequency == 0 or \
            curr_iter == num_iterations
 
+def init_model(model_architecture, compile_params):
+    """
+        Should only be called at the first row of first iteration.
+    """
+    segment_model = model_from_json(model_architecture)
+    compile_model(segment_model, compile_params)
+    return segment_model
+
+def update_model(segment_model, prev_serialized_weights):
+    """
+        Happens at first row of each iteration.
+    """
+    model_shapes = get_model_shapes(segment_model)
+    model_weights = madlib_keras_serializer.deserialize_as_nd_weights(
+        prev_serialized_weights, model_shapes)
+    segment_model.set_weights(model_weights)
+
 def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
                    independent_var_shape, model_architecture,
                    compile_params, fit_params, current_seg_id, seg_ids,
                    images_per_seg, gpus_per_host, segments_per_host,
-                   prev_serialized_weights, **kwargs):
+                   prev_serialized_weights, is_final_iteration=True,
+                   is_multiple_model=False, **kwargs):
     if not independent_var or not dependent_var:
         return state
-
     start_transition = time.time()
     SD = kwargs['SD']
     device_name = get_device_name_and_set_cuda_env(gpus_per_host,
                                                    current_seg_id)
-    # Set up system if this is the first buffer on segment'
-    if not state:
-        set_keras_session(device_name, gpus_per_host, segments_per_host)
-        segment_model = model_from_json(model_architecture)
-        compile_and_set_weights(segment_model, compile_params, device_name,
-                                prev_serialized_weights)
-
-        SD['segment_model'] = segment_model
-        agg_image_count = 0
+    if is_multiple_model:
+        prev_serialized_weights = madlib_keras_serializer.\
+            get_serialized_1d_weights_from_state(prev_serialized_weights)
+    # If a live session is present, re-use it. Otherwise, recreate it.
+    if SD_NAMES.SESS in SD :
+        if SD_NAMES.SEGMENT_MODEL not in SD:
+            plpy.error("Session and model should exist in SD after the first 
row"
+                        "of the first iteration")
+        sess = SD[SD_NAMES.SESS]
+        segment_model = SD[SD_NAMES.SEGMENT_MODEL]
+        K.set_session(sess)
     else:
-        segment_model = SD['segment_model']
-        agg_image_count = 
madlib_keras_serializer.get_image_count_from_state(state)
+        sess = get_keras_session(device_name, gpus_per_host, segments_per_host)
+        SD[SD_NAMES.SESS] = sess
+        K.set_session(sess)
+        segment_model = init_model(model_architecture, compile_params)
+        SD[SD_NAMES.SEGMENT_MODEL] = segment_model
+    agg_image_count = madlib_keras_serializer.get_image_count_from_state(state)
+    if not state: # first row each iteration
+        update_model(segment_model, prev_serialized_weights)
 
     # Prepare the data
     x_train = np_array_float32(independent_var, independent_var_shape)
     y_train = np_array_int16(dependent_var, dependent_var_shape)
 
     # Fit segment model on data
     start_fit = time.time()
-    with K.tf.device(device_name):
-        #TODO consider not doing this every time
-        fit_params = parse_and_validate_fit_params(fit_params)
-        history = segment_model.fit(x_train, y_train, **fit_params)
+    #TODO consider not doing this every time
+    fit_params = parse_and_validate_fit_params(fit_params)
+    history = segment_model.fit(x_train, y_train, **fit_params)
     end_fit = time.time()
-
     image_count = len(x_train)
+
     # Aggregating number of images, loss and accuracy
     agg_image_count += image_count
-
-    with K.tf.device(device_name):
-        updated_weights = segment_model.get_weights()
-
+    updated_weights = segment_model.get_weights()
     total_images = get_image_count_per_seg_from_array(current_seg_id, seg_ids,
                                                       images_per_seg)
 
-    # Re-serialize the weights
-    # Update image count, check if we are done
+    # last row of the last iteration
     if agg_image_count == total_images:
-        # Once done with all images on a segment, we update weights
-        # with the total number of images here instead of the merge function.
-        # The merge function only deals with aggregating them.
-        updated_weights = [ total_images * w for w in updated_weights ]
-            # In GPDB, each segment would have a keras session, so clear
-            # them after the last buffer is processed.
-        clear_keras_session()
+        if not is_multiple_model:
+            updated_weights = [total_images * w for w in updated_weights]
+        if is_final_iteration or is_multiple_model:
+            del SD[SD_NAMES.SEGMENT_MODEL]
+            del SD[SD_NAMES.SESS]
+            K.clear_session()
 
 Review comment:
   We should refactor the `clear_keras_session` function in 
madlib_keras_wrapper.py as 
   ```
   def clear_keras_session():
       sess = K.get_session()
       clear_keras_session(sess)
   
   def clear_keras_session(sess):
       sess.close()
       K.clear_session()
   ```
   
   Then we can call clear_keras_session(sess) here 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to