Baunsgaard commented on a change in pull request #966:
URL: https://github.com/apache/systemml/pull/966#discussion_r437441066



##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               Map<FederatedRange, FederatedData> fedMapping = 
fin.getFedMapping();
+
+               // first we use the spec to construct a meta frame which will 
provide us with info about the encodings
+               List<Pair<FederatedRange, Future<FederatedResponse>>> 
metaFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.ENCODE_META, spec, 
entry.getKey().getBeginDimsInt()[1] + 1), true);
+                       metaFutures.add(new ImmutablePair<>(entry.getKey(), 
response));
+               }
+
+               // TODO support encodings other than recode
+               // the combined mappings for the frame columns (because we only 
support recode)
+               Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) 
fin.getNumColumns()];
+               try {
+                       for(Pair<FederatedRange, Future<FederatedResponse>> 
pair : metaFutures) {
+                               FederatedRange range = pair.getKey();
+                               FederatedResponse federatedResponse = 
pair.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FrameBlock fb = (FrameBlock) 
federatedResponse.getData()[0];
+                                       combineRecodeMaps(combinedRecodeMaps, 
fb, range.getBeginDimsInt()[1]);
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated meta frame 
creation failed: " + e.getMessage());
+               }
+
+               // construct a single meta frameblock out of the multiple 
HashMaps with the recodings
+               FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+               // actually encode the frame block and construct an encoded 
matrix block at worker
+               List<Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       FederatedRange fedRange = entry.getKey();
+                       int columnStart = (int) fedRange.getBeginDims()[1];
+                       int columnEnd = (int) fedRange.getEndDims()[1];
+                       
+                       // Slice out relevant meta part
+                       // range is inclusive
+                       FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() 
- 1, columnStart, columnEnd - 1, null);
+                       
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.FRAME_ENCODE, 
slicedMeta, spec, columnStart + 1), true);
+                       encodedFutures.add(new ImmutablePair<>(entry, 
response));
+               }
+               
+               // construct a federated matrix with the encoded data
+               MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+               
transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+               Map<FederatedRange, FederatedData> transformedFedMapping = new 
HashMap<>();
+               try {
+                       for(Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>> data : encodedFutures) {
+                               FederatedResponse federatedResponse = 
data.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FederatedRange federatedRange = 
data.getKey().getKey();
+                                       FederatedData federatedData = 
data.getKey().getValue();
+                                       long varId = (long) 
federatedResponse.getData()[0];
+                                       
+                                       
transformedFedMapping.put(federatedRange, new FederatedData(federatedData, 
varId));
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated transform 
apply failed: " + e.getMessage());
+               }
+               // set the federated mapping for the matrix
+               transformedMat.setFedMapping(transformedFedMapping);
+
+               // release input and outputs
+               ec.setFrameOutput(getOutput(1).getName(), meta);
+       }
+
+       private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] 
combinedRecodeMaps) {
+               int rows = 0;
+               for(Map<String, Long> map : combinedRecodeMaps) {
+                       if(map != null) {
+                               rows = Integer.max(rows, map.size());
+                       }
+               }
+               FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, 
Types.ValueType.STRING);
+               fb.ensureAllocatedColumns(rows);
+
+               // find maximum number of elements needed for a column
+               int c = -1;
+               for(Map<String, Long> map : combinedRecodeMaps) {

Review comment:
       parallel per column maybe

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               Map<FederatedRange, FederatedData> fedMapping = 
fin.getFedMapping();
+
+               // first we use the spec to construct a meta frame which will 
provide us with info about the encodings
+               List<Pair<FederatedRange, Future<FederatedResponse>>> 
metaFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.ENCODE_META, spec, 
entry.getKey().getBeginDimsInt()[1] + 1), true);
+                       metaFutures.add(new ImmutablePair<>(entry.getKey(), 
response));
+               }
+
+               // TODO support encodings other than recode
+               // the combined mappings for the frame columns (because we only 
support recode)
+               Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) 
fin.getNumColumns()];
+               try {
+                       for(Pair<FederatedRange, Future<FederatedResponse>> 
pair : metaFutures) {
+                               FederatedRange range = pair.getKey();
+                               FederatedResponse federatedResponse = 
pair.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FrameBlock fb = (FrameBlock) 
federatedResponse.getData()[0];
+                                       combineRecodeMaps(combinedRecodeMaps, 
fb, range.getBeginDimsInt()[1]);
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated meta frame 
creation failed: " + e.getMessage());
+               }
+
+               // construct a single meta frameblock out of the multiple 
HashMaps with the recodings
+               FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+               // actually encode the frame block and construct an encoded 
matrix block at worker
+               List<Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       FederatedRange fedRange = entry.getKey();
+                       int columnStart = (int) fedRange.getBeginDims()[1];
+                       int columnEnd = (int) fedRange.getEndDims()[1];
+                       
+                       // Slice out relevant meta part
+                       // range is inclusive
+                       FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() 
- 1, columnStart, columnEnd - 1, null);
+                       
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.FRAME_ENCODE, 
slicedMeta, spec, columnStart + 1), true);
+                       encodedFutures.add(new ImmutablePair<>(entry, 
response));
+               }
+               
+               // construct a federated matrix with the encoded data
+               MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+               
transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+               Map<FederatedRange, FederatedData> transformedFedMapping = new 
HashMap<>();
+               try {
+                       for(Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>> data : encodedFutures) {
+                               FederatedResponse federatedResponse = 
data.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FederatedRange federatedRange = 
data.getKey().getKey();
+                                       FederatedData federatedData = 
data.getKey().getValue();
+                                       long varId = (long) 
federatedResponse.getData()[0];
+                                       
+                                       
transformedFedMapping.put(federatedRange, new FederatedData(federatedData, 
varId));
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated transform 
apply failed: " + e.getMessage());
+               }
+               // set the federated mapping for the matrix
+               transformedMat.setFedMapping(transformedFedMapping);
+
+               // release input and outputs
+               ec.setFrameOutput(getOutput(1).getName(), meta);
+       }
+
+       private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] 
combinedRecodeMaps) {
+               int rows = 0;
+               for(Map<String, Long> map : combinedRecodeMaps) {
+                       if(map != null) {
+                               rows = Integer.max(rows, map.size());
+                       }
+               }
+               FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, 
Types.ValueType.STRING);
+               fb.ensureAllocatedColumns(rows);

Review comment:
       am i missing something or should number of rows not already be known 
before the call to this function?

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
##########
@@ -115,6 +131,77 @@ private FederatedResponse 
constructResponse(FederatedRequest request) {
                }
        }
 
+       private FederatedResponse createFrameEncodeMeta(FederatedRequest 
request) {
+               // param parsing
+               checkNumParams(request.getNumParams(), 3);
+               String spec = (String) request.getParam(0);
+               int globalOffset = (int) request.getParam(1);
+               long varID = (long) request.getParam(2);
+
+               FrameObject fo = (FrameObject) 
PrivacyMonitor.handlePrivacy(_vars.get(varID));
+               FrameBlock data = fo.acquireRead();
+               String[] colNames = data.getColumnNames();
+
+               // create the encoder
+               Encoder encoder = EncoderFactory.createEncoder(spec,
+                       colNames,
+                       data.getNumColumns(),
+                       null,
+                       globalOffset,
+                       globalOffset + data.getNumColumns());
+               // build necessary structures for encoding
+               encoder.build(data);
+               // just get the meta frame block
+               FrameBlock meta = encoder.getMetaData(new 
FrameBlock(data.getNumColumns(), Types.ValueType.STRING));
+               meta.setColumnNames(colNames);
+               // otherwise data of FrameBlock would be null, therefore it 
would fail
+               // hack because serialization of FrameBlock does not function 
if Arrays are not allocated
+               meta.ensureAllocatedColumns(meta.getNumRows());
+               fo.release();
+
+               return new FederatedResponse(FederatedResponse.Type.SUCCESS, 
meta);
+       }
+
+       private FederatedResponse executeFrameEncode(FederatedRequest request) {

Review comment:
       Im not really a fan of this method located inside the workerHandler.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));

Review comment:
       Make it federated output here such that we keep it federated.

##########
File path: 
src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
##########
@@ -142,7 +142,7 @@ public void federatedConstruction(Types.ExecMode execMode, 
String testFile, Stri
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
                }
                fullDMLScriptName = HOME + testFile + ".dml";
-               programArgs = new String[] {"-args", "\"localhost:" + port + 
"/" + input(inputIdentifier) + "\"",
+               programArgs = new String[] {"-args", 
TestUtils.federatedAddress("localhost", port, input(inputIdentifier)),

Review comment:
       I Like this TestUtils for the federated Address. Could you change this 
for more of the federated tests?

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               Map<FederatedRange, FederatedData> fedMapping = 
fin.getFedMapping();
+
+               // first we use the spec to construct a meta frame which will 
provide us with info about the encodings
+               List<Pair<FederatedRange, Future<FederatedResponse>>> 
metaFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.ENCODE_META, spec, 
entry.getKey().getBeginDimsInt()[1] + 1), true);
+                       metaFutures.add(new ImmutablePair<>(entry.getKey(), 
response));
+               }
+
+               // TODO support encodings other than recode
+               // the combined mappings for the frame columns (because we only 
support recode)
+               Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) 
fin.getNumColumns()];
+               try {
+                       for(Pair<FederatedRange, Future<FederatedResponse>> 
pair : metaFutures) {
+                               FederatedRange range = pair.getKey();
+                               FederatedResponse federatedResponse = 
pair.getValue().get();
+                               if(federatedResponse.isSuccessful()) {

Review comment:
       is there ever an else to this if? / if there is should we not throw an 
exception

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
##########
@@ -642,6 +642,9 @@ public void setColumn(int c, Array column) {
 
        ///////
        // serialization / deserialization (implementation of writable and 
externalizable)
+       // FIXME for FrameBlock fix write and readFields, it does not work if 
the Arrays are not yet
+       // allocated (after fixing remove hack in 
FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
+       // FrameBlock.ensureAllocatedColumns())

Review comment:
       Maybe make a Jira report?

##########
File path: 
src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransformFederatedEncodeDecodeTest extends AutomatedTestBase {
+       private static final String TEST_NAME1 = 
"TransformFederatedEncodeDecode";
+       private static final String TEST_DIR = "functions/transform/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
TransformFederatedEncodeDecodeTest.class.getSimpleName()
+               + "/";
+
+       private static final String SPEC = "TransformEncodeDecodeSpec.json";
+
+       private static final int rows = 1234;
+       private static final int cols = 2;
+       private static final double sparsity1 = 0.9;
+       private static final double sparsity2 = 0.1;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"FO"}));
+       }
+
+       @Test
+       public void runTestCSVDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.CSV);
+       }
+
+       @Test
+       public void runTestCSVSparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.CSV);
+       }
+
+       @Test
+       public void runTestTextcellDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.TEXT);
+       }
+
+       @Test
+       public void runTestTextcellSparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.TEXT);
+       }
+
+       @Test
+       public void runTestBinaryDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.BINARY);
+       }
+
+       @Test
+       public void runTestBinarySparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.BINARY);
+       }
+
+       private void runTransformEncodeDecodeTest(boolean sparse, 
Types.FileFormat format) {
+               ExecMode platformOld = rtplatform;
+               rtplatform = ExecMode.SINGLE_NODE;
+
+               Thread t1 = null, t2 = null;
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+
+                       int port1 = getRandomAvailablePort();
+                       t1 = startLocalFedWorker(port1);
+                       int port2 = getRandomAvailablePort();
+                       t2 = startLocalFedWorker(port2);
+
+                       // schema
+                       Types.ValueType[] schema = new Types.ValueType[cols / 
2];
+                       Arrays.fill(schema, Types.ValueType.FP64);
+                       // generate and write input data
+                       // A is the data that will be aggregated and not recoded
+                       double[][] A = TestUtils.round(getRandomMatrix(rows, 
cols / 2, 1, 15, sparse ? sparsity2 : sparsity1, 7));
+                       writeInputFrameWithMTD("A", A, false, schema, format);
+
+                       // B will be recoded and will be the column that will 
be grouped by
+                       Arrays.fill(schema, Types.ValueType.STRING);
+                       // we set sparsity to 1.0 to ensure all the string 
labels exist
+                       double[][] B = TestUtils.round(getRandomMatrix(rows, 
cols / 2, 1, 15, 1.0, 8));
+                       writeInputFrameWithMTD("B", B, false, schema, format);
+
+                       fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME1 
+ ".dml";
+
+                       programArgs = new String[] {"-explain", "-args", 
TestUtils.federatedAddress("localhost", port1, input("A")),

Review comment:
       If you can live without calling explain in tests it would be great. 
since it has a tendency to write alittle to much to the terminal, that in turn 
crash our test suite on workflows.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {

Review comment:
       Looking at the reference `MultiReturnParameterizedBuiltinCPInstruction`
   
   It looks like the design there moves the logic entirely to the Encoder. Is 
there a reason not to do such a thing in this case?
   
   ```java
   
        @Override 
        public void processInstruction(ExecutionContext ec) {
                //obtain and pin input frame
                FrameBlock fin = ec.getFrameInput(input1.getName());
                String spec = ec.getScalarInput(input2).getStringValue();
                String[] colnames = fin.getColumnNames(); 
                
                //execute block transform encode
                Encoder encoder = EncoderFactory.createEncoder(spec, colnames, 
fin.getNumColumns(), null);
                MatrixBlock data = encoder.encode(fin, new 
MatrixBlock(fin.getNumRows(), fin.getNumColumns(), false)); //build and apply
                FrameBlock meta = encoder.getMetaData(new 
FrameBlock(fin.getNumColumns(), ValueType.STRING));
                meta.setColumnNames(colnames);
                
                //release input and outputs
                ec.releaseFrameInput(input1.getName());
                ec.setMatrixOutput(getOutput(0).getName(), data);
                ec.setFrameOutput(getOutput(1).getName(), meta);
        }
   ```

##########
File path: 
src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
##########
@@ -115,6 +131,77 @@ private FederatedResponse 
constructResponse(FederatedRequest request) {
                }
        }
 
+       private FederatedResponse createFrameEncodeMeta(FederatedRequest 
request) {

Review comment:
       Im not really a fan of this method located inside the workerHandler.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);

Review comment:
       In this call, you use outputs.get(0)
   
   I see that you just copied from the 
MultiReturnParameterizedBuiltinCPInstruction but is there not potentially a 
problem in only noting the first operand in outputs for the super class if not 
all functions are correctly overwritten?
   
   Maybe if possible try to parse through a null. That way you will get an null 
pointer exception instead if we encounter some error?
   

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               Map<FederatedRange, FederatedData> fedMapping = 
fin.getFedMapping();
+
+               // first we use the spec to construct a meta frame which will 
provide us with info about the encodings
+               List<Pair<FederatedRange, Future<FederatedResponse>>> 
metaFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.ENCODE_META, spec, 
entry.getKey().getBeginDimsInt()[1] + 1), true);
+                       metaFutures.add(new ImmutablePair<>(entry.getKey(), 
response));
+               }
+
+               // TODO support encodings other than recode
+               // the combined mappings for the frame columns (because we only 
support recode)

Review comment:
       I think it is important that we already think about the extensions. 
   Since you implemented it for recode, how would you extend for others?
   If the answer is complicated we should consider changing the code to make it 
less so.

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);

Review comment:
       maybe this null should be changed to an instruction?
   (again it is copied from the CP instruction implementation but maybe we can 
improve it)
   

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {

Review comment:
       In general I would highly appreciate if you could split this function 
into many smaller functions, this will help once we need to add more 
functionality. (single responsibility principle)

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               Map<FederatedRange, FederatedData> fedMapping = 
fin.getFedMapping();
+
+               // first we use the spec to construct a meta frame which will 
provide us with info about the encodings
+               List<Pair<FederatedRange, Future<FederatedResponse>>> 
metaFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.ENCODE_META, spec, 
entry.getKey().getBeginDimsInt()[1] + 1), true);
+                       metaFutures.add(new ImmutablePair<>(entry.getKey(), 
response));
+               }
+
+               // TODO support encodings other than recode
+               // the combined mappings for the frame columns (because we only 
support recode)
+               Map<String, Long>[] combinedRecodeMaps = new HashMap[(int) 
fin.getNumColumns()];
+               try {
+                       for(Pair<FederatedRange, Future<FederatedResponse>> 
pair : metaFutures) {
+                               FederatedRange range = pair.getKey();
+                               FederatedResponse federatedResponse = 
pair.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FrameBlock fb = (FrameBlock) 
federatedResponse.getData()[0];
+                                       combineRecodeMaps(combinedRecodeMaps, 
fb, range.getBeginDimsInt()[1]);
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated meta frame 
creation failed: " + e.getMessage());
+               }
+
+               // construct a single meta frameblock out of the multiple 
HashMaps with the recodings
+               FrameBlock meta = frameBlockFromRecodeMaps(combinedRecodeMaps);
+
+               // actually encode the frame block and construct an encoded 
matrix block at worker
+               List<Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>>> encodedFutures = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> entry : 
fedMapping.entrySet()) {
+                       FederatedRange fedRange = entry.getKey();
+                       int columnStart = (int) fedRange.getBeginDims()[1];
+                       int columnEnd = (int) fedRange.getEndDims()[1];
+                       
+                       // Slice out relevant meta part
+                       // range is inclusive
+                       FrameBlock slicedMeta = meta.slice(0, meta.getNumRows() 
- 1, columnStart, columnEnd - 1, null);
+                       
+                       Future<FederatedResponse> response = 
entry.getValue().executeFederatedOperation(new FederatedRequest(
+                               FederatedRequest.FedMethod.FRAME_ENCODE, 
slicedMeta, spec, columnStart + 1), true);
+                       encodedFutures.add(new ImmutablePair<>(entry, 
response));
+               }
+               
+               // construct a federated matrix with the encoded data
+               MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+               
transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+               Map<FederatedRange, FederatedData> transformedFedMapping = new 
HashMap<>();
+               try {
+                       for(Pair<Map.Entry<FederatedRange, FederatedData>, 
Future<FederatedResponse>> data : encodedFutures) {
+                               FederatedResponse federatedResponse = 
data.getValue().get();
+                               if(federatedResponse.isSuccessful()) {
+                                       FederatedRange federatedRange = 
data.getKey().getKey();
+                                       FederatedData federatedData = 
data.getKey().getValue();
+                                       long varId = (long) 
federatedResponse.getData()[0];
+                                       
+                                       
transformedFedMapping.put(federatedRange, new FederatedData(federatedData, 
varId));
+                               }
+                       }
+               }
+               catch(InterruptedException | ExecutionException e) {
+                       throw new DMLRuntimeException("Federated transform 
apply failed: " + e.getMessage());
+               }
+               // set the federated mapping for the matrix
+               transformedMat.setFedMapping(transformedFedMapping);
+
+               // release input and outputs
+               ec.setFrameOutput(getOutput(1).getName(), meta);
+       }
+
+       private FrameBlock frameBlockFromRecodeMaps(Map<String, Long>[] 
combinedRecodeMaps) {
+               int rows = 0;
+               for(Map<String, Long> map : combinedRecodeMaps) {
+                       if(map != null) {
+                               rows = Integer.max(rows, map.size());
+                       }
+               }
+               FrameBlock fb = new FrameBlock(combinedRecodeMaps.length, 
Types.ValueType.STRING);
+               fb.ensureAllocatedColumns(rows);
+
+               // find maximum number of elements needed for a column
+               int c = -1;
+               for(Map<String, Long> map : combinedRecodeMaps) {

Review comment:
       parallel per column maybe?

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, outputs.get(0), opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {

Review comment:
       furthermore consider to either implement or mark the for loops that can 
be parallelized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to