zhipeng93 commented on a change in pull request #28:
URL: https://github.com/apache/flink-ml/pull/28#discussion_r767446298



##########
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java
##########
@@ -329,33 +342,42 @@ public static void updateExistingParams(Stage<?> stage, 
Map<Param<?>, Object> pa
      * @param model The model data stream.
      * @param path The parent directory of the model data file.
      * @param modelEncoder The encoder to encode the model data.
+     * @param modelIndex The index of the table to save.
      * @param <T> The class type of the model data.
      */
     public static <T> void saveModelData(
-            DataStream<T> model, String path, Encoder<T> modelEncoder) {
+            DataStream<T> model, String path, Encoder<T> modelEncoder, int 
modelIndex) {
         FileSink<T> sink =
                 FileSink.forRowFormat(
-                                new 
org.apache.flink.core.fs.Path(getDataPath(path)), modelEncoder)
+                                new 
org.apache.flink.core.fs.Path(getDataPath(path, modelIndex)),
+                                modelEncoder)
                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
                         .withBucketAssigner(new BasePathBucketAssigner<>())
                         .build();
         model.sinkTo(sink);
     }
 
     /**
-     * Loads the model data from the given path using the model decoder.
+     * Loads the model table with index `modelIndex` from the given path using 
the model decoder.
      *
      * @param env A StreamExecutionEnvironment instance.
      * @param path The parent directory of the model data file.
      * @param modelDecoder The decoder used to decode the model data.
+     * @param modelIndex The index of the table to load.
      * @param <T> The class type of the model data.
      * @return The loaded model data.
      */
-    public static <T> DataStream<T> loadModelData(
-            StreamExecutionEnvironment env, String path, SimpleStreamFormat<T> 
modelDecoder) {
+    public static <T> Table loadModelData(

Review comment:
       I thought there would be cases for the model data that contains multiple 
tables. After some offline discussion, we aggree to remove `modelIndex` for now 
and support multiple tables when we come to the real use case.
   
   We also aggree to let `loadModelData ` to return a DataStream for two 
reasons: (1) save and load could be symmetric (2) `saveModelData` and 
`loadModelData` are utility functions for developers, who are often using 
`DataStream`s.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to