This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 51d8b29  [SYSTEMDS-2554,2558,2561] Initial federated transform encode
51d8b29 is described below

commit 51d8b29f6c0891568da9ec8d489fd5de7565699d
Author: Kevin Innerebner <[email protected]>
AuthorDate: Mon Jun 8 16:55:50 2020 +0200

    [SYSTEMDS-2554,2558,2561] Initial federated transform encode
    
    Encoders: recode, pass-through, composite
    
    Closes #966.
---
 .../controlprogram/federated/FederatedData.java    |   4 +
 .../controlprogram/federated/FederatedRequest.java |   4 +-
 .../federated/FederatedWorkerHandler.java          |  72 ++++++++-
 .../controlprogram/federated/FederationMap.java    |  64 ++++++++
 .../sysds/runtime/instructions/cp/CPOperand.java   |   4 +
 .../runtime/instructions/fed/FEDInstruction.java   |   5 +-
 .../instructions/fed/FEDInstructionUtils.java      |  12 ++
 ...tiReturnParameterizedBuiltinFEDInstruction.java | 137 +++++++++++++++++
 .../sysds/runtime/matrix/data/FrameBlock.java      |   3 +
 .../sysds/runtime/transform/encode/Encoder.java    |  47 ++++++
 .../runtime/transform/encode/EncoderComposite.java |  48 +++++-
 .../runtime/transform/encode/EncoderFactory.java   |  29 ++--
 .../transform/encode/EncoderPassThrough.java       |  32 ++++
 .../runtime/transform/encode/EncoderRecode.java    |  67 ++++++++
 .../sysds/runtime/transform/meta/TfMetaUtils.java  |  69 ++++++---
 .../federated/FederatedConstructionTest.java       |   2 +-
 .../TransformFederatedEncodeDecodeTest.java        | 168 +++++++++++++++++++++
 .../transform/TransformFederatedEncodeDecode.dml   |  37 +++++
 18 files changed, 766 insertions(+), 38 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 1d5f5df..2c5f902 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -76,6 +76,10 @@ public class FederatedData {
                _varID = varID;
        }
        
+       public long getVarID() {
+               return _varID;
+       }
+       
        public String getFilepath() {
                return _filepath;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
index f2d53e4..24be89f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedRequest.java
@@ -34,7 +34,9 @@ public class FederatedRequest implements Serializable {
                READ_VAR, // create variable for local data, read on first 
access
                PUT_VAR,  // receive data from main and store to local variable
                GET_VAR,  // return local variable to main
-               EXEC_INST // execute arbitrary instruction over
+               EXEC_INST, // execute arbitrary instruction over
+               FRAME_ENCODE, // TODO replace with user defined functions
+               CREATE_ENCODER
        }
        
        private RequestType _method;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index b7bbafe..e2332e2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
@@ -30,6 +31,7 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
@@ -44,12 +46,17 @@ import 
org.apache.sysds.runtime.instructions.InstructionParser;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.runtime.privacy.DMLPrivacyException;
 import org.apache.sysds.runtime.privacy.PrivacyMonitor;
 import org.apache.sysds.runtime.privacy.PrivacyPropagator;
+import org.apache.sysds.runtime.transform.encode.Encoder;
+import org.apache.sysds.runtime.transform.encode.EncoderFactory;
 import org.apache.sysds.utils.JSONHelper;
 import org.apache.wink.json4j.JSONObject;
 
@@ -108,6 +115,10 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                                        return getVariable(request);
                                case EXEC_INST:
                                        return execInstruction(request);
+                               case CREATE_ENCODER:
+                                       return createFrameEncoder(request);
+                               case FRAME_ENCODE:
+                                       return executeFrameEncode(request);
                                default:
                                        String message = String.format("Method 
%s is not supported.", method);
                                        return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR, new 
FederatedWorkerHandlerException(message));
@@ -117,12 +128,64 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR, ex);
                }
                catch (Exception ex) {
-                       return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR, 
-                               new FederatedWorkerHandlerException("Exception 
of type " 
+                       return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR,
+                               new FederatedWorkerHandlerException("Exception 
of type "
                                + ex.getClass() + " thrown when processing 
request", ex));
                }
        }
 
+       private FederatedResponse createFrameEncoder(FederatedRequest request) {
+               // param parsing
+               checkNumParams(request.getNumParams(), 2);
+               String spec = (String) request.getParam(0);
+               int globalOffset = (int) request.getParam(1);
+               long varID = request.getID();
+
+               Data dataObject = _ec.getVariable(String.valueOf(varID));
+               FrameObject fo = (FrameObject) 
PrivacyMonitor.handlePrivacy(dataObject);
+               FrameBlock data = fo.acquireRead();
+               String[] colNames = data.getColumnNames();
+
+               // create the encoder
+               Encoder encoder = EncoderFactory.createEncoder(spec, colNames,
+                       data.getNumColumns(), null, globalOffset, globalOffset 
+ data.getNumColumns());
+               // build necessary structures for encoding
+               encoder.build(data);
+               // otherwise data of FrameBlock would be null, therefore it 
would fail
+               // hack because serialization of FrameBlock does not function 
if Arrays are not allocated
+               fo.release();
+
+               return new FederatedResponse(ResponseType.SUCCESS, encoder);
+       }
+
+       private FederatedResponse executeFrameEncode(FederatedRequest request) {
+               checkNumParams(request.getNumParams(), 2);
+               Encoder encoder = (Encoder) request.getParam(0);
+               long newVarID = (long) request.getParam(1);
+               long varID = request.getID();
+
+               Data dataObject = _ec.getVariable(String.valueOf(varID));
+               FrameObject fo = (FrameObject) 
PrivacyMonitor.handlePrivacy(dataObject);
+               FrameBlock data = fo.acquireRead();
+
+               // apply transformation
+               MatrixBlock mbout = encoder.apply(data, new 
MatrixBlock(data.getNumRows(), data.getNumColumns(), false));
+
+               // copy characteristics
+               MatrixCharacteristics mc = new 
MatrixCharacteristics(fo.getDataCharacteristics());
+               MatrixObject mo = new MatrixObject(Types.ValueType.FP64, 
OptimizerUtils.getUniqueTempFileName(),
+                       new MetaDataFormat(mc, FileFormat.BINARY));
+               // set the encoded data
+               mo.acquireModify(mbout);
+               mo.release();
+               fo.release();
+
+               // add it to the list of variables
+               _ec.setVariable(String.valueOf(newVarID), mo);
+               // return id handle
+               return new FederatedResponse(ResponseType.SUCCESS_EMPTY);
+       }
+       
        private FederatedResponse readData(FederatedRequest request) {
                checkNumParams(request.getNumParams(), 2);
                String filename = (String) request.getParam(0);
@@ -143,7 +206,7 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                                break;
                        default:
                                // should NEVER happen (if we keep request 
codes in sync with actual behaviour)
-                               return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR, 
+                               return new 
FederatedResponse(FederatedResponse.ResponseType.ERROR,
                                        new 
FederatedWorkerHandlerException("Could not recognize datatype"));
                }
                
@@ -168,6 +231,9 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                        throw new DMLRuntimeException(ex);
                }
                cd.setMetaData(new MetaDataFormat(mc, fmt));
+               // TODO send FileFormatProperties with request and use them for 
CSV, this is currently a workaround so reading
+               //  of CSV files works
+               cd.setFileFormatProperties(new FileFormatPropertiesCSV());
                cd.acquireRead();
                cd.refreshMetaData(); //in pinned state
                cd.release();
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index d2e2300..f224da2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -24,7 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.function.BiFunction;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -35,6 +38,7 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.Reques
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class FederationMap
 {
@@ -150,4 +154,64 @@ public class FederationMap
                }
                return this;
        }
+
+       /**
+        * Execute a function for each <code>FederatedRange</code> + 
<code>FederatedData</code> pair. The function should
+        * not change any data of the pair and instead use 
<code>mapParallel</code> if that is a necessity. Note that this
+        * operation is parallel and necessary synchronisation has to be 
performed.
+        * 
+        * @param forEachFunction function to execute for each pair
+        */
+       public void forEachParallel(BiFunction<FederatedRange, FederatedData, 
Void> forEachFunction) {
+               ExecutorService pool = CommonThreadPool.get(_fedMap.size());
+
+               ArrayList<MappingTask> mappingTasks = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> fedMap : 
_fedMap.entrySet())
+                       mappingTasks.add(new MappingTask(fedMap.getKey(), 
fedMap.getValue(), forEachFunction, _ID));
+               CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
+       }
+
+       /**
+        * Execute a function for each <code>FederatedRange</code> + 
<code>FederatedData</code> pair mapping the pairs to
+        * their new form by directly changing both <code>FederatedRange</code> 
and <code>FederatedData</code>. The varIDs
+        * don't have to be changed by the <code>mappingFunction</code> as that 
will be done by this method. Note that this
+        * operation is parallel and necessary synchronisation has to be 
performed.
+        *
+        * @param newVarID        the new varID to be used by the new 
FederationMap
+        * @param mappingFunction the function directly changing ranges and data
+        * @return the new <code>FederationMap</code>
+        */
+       public FederationMap mapParallel(long newVarID, 
BiFunction<FederatedRange, FederatedData, Void> mappingFunction) {
+               ExecutorService pool = CommonThreadPool.get(_fedMap.size());
+
+               FederationMap fedMapCopy = copyWithNewID(_ID);
+               ArrayList<MappingTask> mappingTasks = new ArrayList<>();
+               for(Map.Entry<FederatedRange, FederatedData> fedMap : 
fedMapCopy._fedMap.entrySet())
+                       mappingTasks.add(new MappingTask(fedMap.getKey(), 
fedMap.getValue(), mappingFunction, newVarID));
+               CommonThreadPool.invokeAndShutdown(pool, mappingTasks);
+               fedMapCopy._ID = newVarID;
+               return fedMapCopy;
+       }
+
+       private static class MappingTask implements Callable<Void> {
+               private final FederatedRange _range;
+               private final FederatedData _data;
+               private final BiFunction<FederatedRange, FederatedData, Void> 
_mappingFunction;
+               private final long _varID;
+
+               public MappingTask(FederatedRange range, FederatedData data,
+                       BiFunction<FederatedRange, FederatedData, Void> 
mappingFunction, long varID) {
+                       _range = range;
+                       _data = data;
+                       _mappingFunction = mappingFunction;
+                       _varID = varID;
+               }
+
+               @Override
+               public Void call() throws Exception {
+                       _mappingFunction.apply(_range, _data);
+                       _data.setVarID(_varID);
+                       return null;
+               }
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPOperand.java 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPOperand.java
index 97c6e1e..3afb681 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPOperand.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPOperand.java
@@ -93,6 +93,10 @@ public class CPOperand
        public boolean isMatrix() {
                return _dataType.isMatrix();
        }
+       
+       public boolean isFrame() {
+               return _dataType.isFrame();
+       }
 
        public boolean isTensor() {
                return _dataType.isTensor();
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
index fc064eb..f2d0791 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
@@ -27,11 +27,12 @@ import org.apache.sysds.runtime.privacy.PrivacyPropagator;
 public abstract class FEDInstruction extends Instruction {
        
        public enum FEDType {
-               Init,
                AggregateBinary,
                AggregateUnary,
                Append,
-               Binary
+               Binary,
+               Init,
+               MultiReturnParameterizedBuiltin
        }
        
        protected final FEDType _fedType;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index c6927df..d639baa 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.instructions.Instruction;
@@ -64,6 +65,17 @@ public class FEDInstructionUtils {
                                        return 
BinaryFEDInstruction.parseInstruction(inst.getInstructionString());
                        }
                }
+               else if (inst instanceof 
MultiReturnParameterizedBuiltinCPInstruction) {
+                       MultiReturnParameterizedBuiltinCPInstruction 
instruction = (MultiReturnParameterizedBuiltinCPInstruction) inst;
+                       String opcode = instruction.getOpcode();
+                       if(opcode.equals("transformencode") && 
instruction.input1.isFrame()) {
+                               CacheableData<?> fo = 
ec.getCacheableData(instruction.input1);
+                               if(fo.isFederated()) {
+                                       return 
MultiReturnParameterizedBuiltinFEDInstruction
+                                               
.parseInstruction(instruction.getInstructionString());
+                               }
+                       }
+               }
                return inst;
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
new file mode 100644
index 0000000..0aad335
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/MultiReturnParameterizedBuiltinFEDInstruction.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.transform.encode.Encoder;
+import org.apache.sysds.runtime.transform.encode.EncoderComposite;
+import org.apache.sysds.runtime.transform.encode.EncoderPassThrough;
+import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+
+public class MultiReturnParameterizedBuiltinFEDInstruction extends 
ComputationFEDInstruction {
+       protected final ArrayList<CPOperand> _outputs;
+
+       private MultiReturnParameterizedBuiltinFEDInstruction(Operator op, 
CPOperand input1, CPOperand input2,
+               ArrayList<CPOperand> outputs, String opcode, String istr) {
+               super(FEDType.MultiReturnParameterizedBuiltin, op, input1, 
input2, null, opcode, istr);
+               _outputs = outputs;
+       }
+
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+
+       public static MultiReturnParameterizedBuiltinFEDInstruction 
parseInstruction(String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<>();
+               String opcode = parts[0];
+
+               if(opcode.equalsIgnoreCase("transformencode")) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add(new CPOperand(parts[3], 
Types.ValueType.FP64, Types.DataType.MATRIX));
+                       outputs.add(new CPOperand(parts[4], 
Types.ValueType.STRING, Types.DataType.FRAME));
+                       return new 
MultiReturnParameterizedBuiltinFEDInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // obtain and pin input frame
+               FrameObject fin = ec.getFrameObject(input1.getName());
+               String spec = ec.getScalarInput(input2).getStringValue();
+
+               // the encoder in which the complete encoding information will 
be aggregated
+               EncoderComposite globalEncoder = new EncoderComposite(
+                       Arrays.asList(new EncoderRecode(), new 
EncoderPassThrough()));
+               // first create encoders at the federated workers, then collect 
them and aggregate them to a single large
+               // encoder
+               FederationMap fedMapping = fin.getFedMapping();
+               fedMapping.forEachParallel((range, data) -> {
+                       int columnOffset = (int) range.getBeginDims()[1] + 1;
+
+                       // create an encoder with the given spec. The 
columnOffset (which is 1 based) has to be used to
+                       // tell the federated worker how much the indexes in 
the spec have to be offset.
+                       Future<FederatedResponse> response = 
data.executeFederatedOperation(
+                               new 
FederatedRequest(RequestType.CREATE_ENCODER, data.getVarID(), spec, 
columnOffset));
+                       // collect responses with encoders
+                       try {
+                               Encoder encoder = (Encoder) 
response.get().getData()[0];
+                               // merge this encoder into a composite encoder
+                               synchronized(globalEncoder) {
+                                       globalEncoder.mergeAt(encoder, 
columnOffset);
+                               }
+                       }
+                       catch(Exception e) {
+                               throw new DMLRuntimeException("Federated 
encoder creation failed: " + e.getMessage());
+                       }
+                       return null;
+               });
+               long varID = FederationUtils.getNextFedDataID();
+               FederationMap transformedFedMapping = 
fedMapping.mapParallel(varID, (range, data) -> {
+                       int colStart = (int) range.getBeginDims()[1] + 1;
+                       int colEnd = (int) range.getEndDims()[1] + 1;
+                       // get the encoder segment that is relevant for this 
federated worker
+                       Encoder encoder = 
globalEncoder.subRangeEncoder(colStart, colEnd);
+                       try {
+                               FederatedResponse response = 
data.executeFederatedOperation(
+                                       new 
FederatedRequest(RequestType.FRAME_ENCODE, data.getVarID(), encoder, 
varID)).get();
+                               if(!response.isSuccessful())
+                                       response.throwExceptionFromResponse();
+                       }
+                       catch(Exception e) {
+                               throw new DMLRuntimeException(e);
+                       }
+                       return null;
+               });
+
+               // construct a federated matrix with the encoded data
+               MatrixObject transformedMat = ec.getMatrixObject(getOutput(0));
+               
transformedMat.getDataCharacteristics().set(fin.getDataCharacteristics());
+               // set the federated mapping for the matrix
+               transformedMat.setFedMapping(transformedFedMapping);
+
+               // release input and outputs
+               ec.setFrameOutput(getOutput(1).getName(),
+                       globalEncoder.getMetaData(new 
FrameBlock(globalEncoder.getNumCols(), Types.ValueType.STRING)));
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 7ae6b53..9605380 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -664,6 +664,9 @@ public class FrameBlock implements CacheBlock, 
Externalizable
 
        ///////
        // serialization / deserialization (implementation of writable and 
externalizable)
+       // FIXME for FrameBlock fix write and readFields, it does not work if 
the Arrays are not yet
+       // allocated (after fixing remove hack in 
FederatedWorkerHandler.createFrameEncodeMeta(FederatedRequest) call to
+       // FrameBlock.ensureAllocatedColumns())
        
        @Override
        public void write(DataOutput out) throws IOException {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
index 912325f..4c0ad9e 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/Encoder.java
@@ -22,8 +22,11 @@ package org.apache.sysds.runtime.transform.encode;
 import java.io.Serializable;
 import java.util.Arrays;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -123,6 +126,50 @@ public abstract class Encoder implements Serializable
        public abstract MatrixBlock apply(FrameBlock in, MatrixBlock out);
 
        /**
+        * Returns a new Encoder that only handles a sub range of columns.
+        * 
+        * @param colStart the start index of the sub-range (1-based, inclusive)
+        * @param colEnd   the end index of the sub-range (1-based, exclusive)
+        * @return an encoder of the same type, just for the sub-range
+        */
+       public Encoder subRangeEncoder(int colStart, int colEnd) {
+               throw new DMLRuntimeException(
+                       this.getClass().getName() + " does not support the 
creation of a sub-range encoder");
+       }
+
+       /**
+        * Merges the column information, like how many columns the frame needs 
and which columns this encoder operates on.
+        * 
+        * @param other the other encoder of the same type
+        * @param col   column at which the second encoder will be merged in 
(1-based)
+        */
+       protected void mergeColumnInfo(Encoder other, int col) {
+               // update number of columns
+               _clen = Math.max(_colList.length, col - 1 + other.getNumCols());
+
+               // update the new columns that this encoder operates on
+               Set<Integer> colListAgg = new HashSet<>(); // for dedup
+               for(int i : _colList)
+                       colListAgg.add(i);
+               for(int i : other._colList)
+                       colListAgg.add(col - 1 + i);
+               _colList = colListAgg.stream().mapToInt(i -> i).toArray();
+       }
+
+       /**
+        * Merges another encoder, of a compatible type, in after a certain 
position. Resizes as necessary.
+        * <code>Encoders</code> are compatible with themselves and 
<code>EncoderComposite</code> is compatible with every
+        * other <code>Encoder</code>.
+        * 
+        * @param other the encoder that should be merged in
+        * @param col   the position where it should be placed (1-based)
+        */
+       public void mergeAt(Encoder other, int col) {
+               throw new DMLRuntimeException(
+                       this.getClass().getName() + " does not support merging 
with " + other.getClass().getName());
+       }
+
+       /**
         * Construct a frame block out of the transform meta data.
         * 
         * @param out output frame block
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderComposite.java
index 08272e0..e653307 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderComposite.java
@@ -19,10 +19,12 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
@@ -39,7 +41,7 @@ public class EncoderComposite extends Encoder
        private List<Encoder> _encoders = null;
        private FrameBlock _meta = null;
        
-       protected EncoderComposite(List<Encoder> encoders) {
+       public EncoderComposite(List<Encoder> encoders) {
                super(null, -1);
                _encoders = encoders;
        }
@@ -100,7 +102,49 @@ public class EncoderComposite extends Encoder
                }
                return out;
        }
-       
+
+       @Override
+       public Encoder subRangeEncoder(int colStart, int colEnd) {
+               List<Encoder> subRangeEncoders = new ArrayList<>();
+               for (Encoder encoder : _encoders) {
+                       Encoder subEncoder = encoder.subRangeEncoder(colStart, 
colEnd);
+                       if (subEncoder != null) {
+                               subRangeEncoders.add(subEncoder);
+                       }
+               }
+               return new EncoderComposite(subRangeEncoders);
+       }
+
+       @Override
+       public void mergeAt(Encoder other, int col) {
+               if (other instanceof EncoderComposite) {
+                       EncoderComposite otherComposite = (EncoderComposite) 
other;
+                       // TODO maybe assert that the _encoders never have the 
same type of encoder twice or more
+                       for (Encoder otherEnc : otherComposite.getEncoders()) {
+                               boolean mergedIn = false;
+                               for (Encoder encoder : _encoders) {
+                                       if (encoder.getClass() == 
otherEnc.getClass()) {
+                                               encoder.mergeAt(otherEnc, col);
+                                               mergedIn = true;
+                                               break;
+                                       }
+                               }
+                               if(!mergedIn) {
+                                       throw new DMLRuntimeException("Tried to 
merge in encoder of class that is not present in "
+                                               + "CompositeEncoder: " + 
otherEnc.getClass().getSimpleName());
+                               }
+                       }
+                       return;
+               }
+               for (Encoder encoder : _encoders) {
+                       if (encoder.getClass() == other.getClass()) {
+                               encoder.mergeAt(other, col);
+                               return;
+                       }
+               }
+               super.mergeAt(other, col);
+       }
+
        @Override
        public FrameBlock getMetaData(FrameBlock out) {
                if( _meta != null )
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
index b7443f4..dcd2b1c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
@@ -41,6 +41,11 @@ public class EncoderFactory
        public static Encoder createEncoder(String spec, String[] colnames, int 
clen, FrameBlock meta) {
                return createEncoder(spec, colnames, 
UtilFunctions.nCopies(clen, ValueType.STRING), meta);
        }
+       
+       public static Encoder createEncoder(String spec, String[] colnames, int 
clen, FrameBlock meta, int minCol,
+               int maxCol) {
+               return createEncoder(spec, colnames, 
UtilFunctions.nCopies(clen, ValueType.STRING), meta, minCol, maxCol);
+       }
 
        public static Encoder createEncoder(String spec, String[] colnames, 
ValueType[] schema, int clen, FrameBlock meta) {
                ValueType[] lschema = (schema==null) ? 
UtilFunctions.nCopies(clen, ValueType.STRING) : schema;
@@ -48,6 +53,11 @@ public class EncoderFactory
        }
        
        public static Encoder createEncoder(String spec, String[] colnames, 
ValueType[] schema, FrameBlock meta) {
+               return createEncoder(spec, colnames, schema, meta, -1, -1);
+       }
+       
+       public static Encoder createEncoder(String spec, String[] colnames, 
ValueType[] schema, FrameBlock meta, int minCol,
+               int maxCol) {
                Encoder encoder = null;
                int clen = schema.length;
                
@@ -55,21 +65,21 @@ public class EncoderFactory
                        //parse transform specification
                        JSONObject jSpec = new JSONObject(spec);
                        List<Encoder> lencoders = new ArrayList<>();
-               
+                       
                        //prepare basic id lists (recode, feature hash, 
dummycode, pass-through)
                        List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject(
-                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.RECODE.toString())));
+                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.RECODE.toString(), minCol, maxCol)));
                        List<Integer>haIDs = Arrays.asList(ArrayUtils.toObject(
-                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.HASH.toString())));
+                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.HASH.toString(), minCol, maxCol)));
                        List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject(
-                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.DUMMYCODE.toString())));
+                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.DUMMYCODE.toString(), minCol, maxCol)));
                        List<Integer> binIDs = 
TfMetaUtils.parseBinningColIDs(jSpec, colnames);
                        //note: any dummycode column requires recode as 
preparation, unless it follows binning
                        rcIDs = except(unionDistinct(rcIDs, except(dcIDs, 
binIDs)), haIDs);
                        List<Integer> ptIDs = 
except(except(UtilFunctions.getSeqList(1, clen, 1),
                                unionDistinct(rcIDs,haIDs)), binIDs);
                        List<Integer> oIDs = Arrays.asList(ArrayUtils.toObject(
-                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.OMIT.toString())));
+                               TfMetaUtils.parseJsonIDList(jSpec, colnames, 
TfMethod.OMIT.toString(), minCol, maxCol)));
                        List<Integer> mvIDs = Arrays.asList(ArrayUtils.toObject(
                                TfMetaUtils.parseJsonObjectIDList(jSpec, 
colnames, TfMethod.IMPUTE.toString())));
                        
@@ -86,7 +96,7 @@ public class EncoderFactory
                        }
                        if( !ptIDs.isEmpty() )
                                lencoders.add(new EncoderPassThrough(
-                                       
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));
+                                               
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));
                        if( !binIDs.isEmpty() )
                                lencoders.add(new EncoderBin(jSpec, colnames, 
schema.length));
                        if( !dcIDs.isEmpty() )
@@ -105,8 +115,8 @@ public class EncoderFactory
                        //initialize meta data w/ robustness for superset of 
cols
                        if( meta != null ) {
                                String[] colnames2 = meta.getColumnNames();
-                               if( !TfMetaUtils.isIDSpec(jSpec) && 
colnames!=null && colnames2!=null 
-                                       && !ArrayUtils.isEquals(colnames, 
colnames2) ) 
+                               if( !TfMetaUtils.isIDSpec(jSpec) && 
colnames!=null && colnames2!=null
+                                       && !ArrayUtils.isEquals(colnames, 
colnames2) )
                                {
                                        HashMap<String, Integer> colPos = 
getColumnPositions(colnames2);
                                        //create temporary meta frame block w/ 
shallow column copy
@@ -115,7 +125,7 @@ public class EncoderFactory
                                        for( int i=0; i<colnames.length; i++ ) {
                                                if( 
!colPos.containsKey(colnames[i]) ) {
                                                        throw new 
DMLRuntimeException("Column name not found in meta data: "
-                                                               +colnames[i]+" 
(meta: "+Arrays.toString(colnames2)+")");
+                                                               + colnames[i]+" 
(meta: "+Arrays.toString(colnames2)+")");
                                                }
                                                int pos = 
colPos.get(colnames[i]);
                                                meta2.setColumn(i, 
meta.getColumn(pos));
@@ -129,7 +139,6 @@ public class EncoderFactory
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
-               
                return encoder;
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderPassThrough.java
index c5eb873..8b3d36a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderPassThrough.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
@@ -38,6 +40,10 @@ public class EncoderPassThrough extends Encoder
        protected EncoderPassThrough(int[] ptCols, int clen) {
                super(ptCols, clen); //1-based 
        }
+       
+       public EncoderPassThrough() {
+               this(new int[0], 0);
+       }
 
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
@@ -64,6 +70,32 @@ public class EncoderPassThrough extends Encoder
                
                return out;
        }
+       
+       @Override
+       public Encoder subRangeEncoder(int colStart, int colEnd) {
+               if (colStart - 1 >= _clen)
+                       return null;
+               
+               List<Integer> colList = new ArrayList<>();
+               for (int col : _colList) {
+                       if (col >= colStart && col < colEnd)
+                               // add the correct column, removed columns 
before start
+                               colList.add(col - (colStart - 1));
+               }
+               if (colList.isEmpty())
+                       // empty encoder -> return null
+                       return null;
+               return new EncoderPassThrough(colList.stream().mapToInt(i -> 
i).toArray(), colEnd - colStart);
+       }
+       
+       @Override
+       public void mergeAt(Encoder other, int col) {
+               if(other instanceof EncoderPassThrough) {
+                       mergeColumnInfo(other, col);
+                       return;
+               }
+               super.mergeAt(other, col);
+       }
 
        @Override
        public FrameBlock getMetaData(FrameBlock meta) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
index 8a8bbef..fe3f5b1 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderRecode.java
@@ -19,9 +19,12 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.wink.json4j.JSONException;
@@ -47,6 +50,14 @@ public class EncoderRecode extends Encoder
                _colList = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, 
TfMethod.RECODE.toString());
        }
        
+       private EncoderRecode(int[] colList, int clen) {
+               super(colList, clen);
+       }
+       
+       public EncoderRecode() {
+               this(new int[0], 0);
+       }
+       
        public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
                return _rcdMaps; 
        }
@@ -148,6 +159,62 @@ public class EncoderRecode extends Encoder
        }
 
        @Override
+       public Encoder subRangeEncoder(int colStart, int colEnd) {
+               if (colStart - 1 >= _clen)
+                       return null;
+               
+               List<Integer> cols = new ArrayList<>();
+               HashMap<Integer, HashMap<String, Long>> rcdMaps = new 
HashMap<>();
+               for (int col : _colList) {
+                       if (col >= colStart && col < colEnd) {
+                               // add the correct column, removed columns 
before start
+                               // colStart - 1 because colStart is 1-based
+                               int corrColumn = col - (colStart - 1);
+                               cols.add(corrColumn);
+                               // copy rcdMap for column
+                               rcdMaps.put(corrColumn, new 
HashMap<>(_rcdMaps.get(col)));
+                       }
+               }
+               if (cols.isEmpty())
+                       // empty encoder -> sub range encoder does not exist
+                       return null;
+               
+               int[] colList = cols.stream().mapToInt(i -> i).toArray();
+               EncoderRecode subRangeEncoder = new EncoderRecode(colList, 
colEnd - colStart);
+               subRangeEncoder._rcdMaps = rcdMaps;
+               return subRangeEncoder;
+       }
+
+       @Override
+       public void mergeAt(Encoder other, int col) {
+               if(other instanceof EncoderRecode) {
+                       mergeColumnInfo(other, col);
+                       
+                       // merge together overlapping columns or add new columns
+                       EncoderRecode otherRec = (EncoderRecode) other;
+                       for (int otherColID : other._colList) {
+                               int colID = otherColID + col - 1;
+                               //allocate column map if necessary
+                               if( !_rcdMaps.containsKey(colID) )
+                                       _rcdMaps.put(colID, new HashMap<>());
+                               
+                               HashMap<String, Long> otherMap = 
otherRec._rcdMaps.get(otherColID);
+                               if(otherMap != null) {
+                                       // for each column, add all non present 
recode values
+                                       for(Map.Entry<String, Long> entry : 
otherMap.entrySet()) {
+                                               if (lookupRCDMap(colID, 
entry.getKey()) == -1) {
+                                                       // key does not yet 
exist
+                                                       
putCode(_rcdMaps.get(colID), entry.getKey());
+                                               }
+                                       }
+                               }
+                       }
+                       return;
+               }
+               super.mergeAt(other, col);
+       }
+
+       @Override
        public FrameBlock getMetaData(FrameBlock meta) {
                if( !isApplicable() )
                        return meta;
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/meta/TfMetaUtils.java 
b/src/main/java/org/apache/sysds/runtime/transform/meta/TfMetaUtils.java
index cb9ad41..39f5650 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/meta/TfMetaUtils.java
@@ -25,18 +25,16 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
 import org.apache.sysds.api.jmlc.Connection;
-import org.apache.sysds.lops.Lop;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
@@ -47,6 +45,9 @@ import 
org.apache.sysds.runtime.transform.decode.DecoderRecode;
 import org.apache.sysds.runtime.util.CollectionUtils;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
 
 public class TfMetaUtils 
 {
@@ -80,17 +81,33 @@ public class TfMetaUtils
        
        /**
         * TODO consolidate external and internal json spec definitions
-        * 
+        *
+        * @param spec transform specification as json string
+        * @param colnames column names
+        * @param group attribute name in json class
+        * @return list of column ids
+        * @throws JSONException if JSONException occurs
+        */
+       public static int[] parseJsonIDList(JSONObject spec, String[] colnames, 
String group)
+                       throws JSONException
+       {
+               return parseJsonIDList(spec, colnames, group, -1, -1);
+       }
+       
+       /**
         * @param spec transform specification as json string
         * @param colnames column names
-        * @param group ?
+        * @param group attribute name in json class
+        * @param minCol start of columns to ignore (1-based, inclusive, if -1 
not used)
+        * @param maxCol end of columns to ignore (1-based, exclusive, if -1 
not used)
         * @return list of column ids
         * @throws JSONException if JSONException occurs
         */
-       public static int[] parseJsonIDList(JSONObject spec, String[] colnames, 
String group) 
+       public static int[] parseJsonIDList(JSONObject spec, String[] colnames, 
String group, int minCol, int maxCol)
                throws JSONException
        {
-               int[] colList = new int[0];
+               List<Integer> colList = new ArrayList<>();
+               int[] arr = new int[0];
                boolean ids = spec.containsKey("ids") && spec.getBoolean("ids");
                
                if( spec.containsKey(group) ) {
@@ -104,21 +121,35 @@ public class TfMetaUtils
                                attrs = (JSONArray)spec.get(group);
                        
                        //construct ID list array
-                       colList = new int[attrs.size()];
-                       for(int i=0; i < colList.length; i++) {
-                               colList[i] = ids ? 
UtilFunctions.toInt(attrs.get(i)) : 
-                                       (ArrayUtils.indexOf(colnames, 
attrs.get(i)) + 1);
-                               if( colList[i] <= 0 ) {
-                                       throw new RuntimeException("Specified 
column '" + 
-                                               attrs.get(i)+"' does not 
exist.");
+                       for(int i=0; i < attrs.length(); i++) {
+                               int ix;
+                               if (ids) {
+                                       ix = UtilFunctions.toInt(attrs.get(i));
+                                       if(maxCol != -1 && ix >= maxCol)
+                                               ix = -1;
+                                       if(minCol != -1 && ix >= 0)
+                                               ix -= minCol - 1;
+                               }
+                               else {
+                                       ix = ArrayUtils.indexOf(colnames, 
attrs.get(i)) + 1;
                                }
+                               if(ix <= 0) {
+                                       if (minCol == -1 && maxCol == -1) {
+                                               // only if we cut of some 
columns, ix -1 is expected
+                                               throw new 
RuntimeException("Specified column '"
+                                                       + attrs.get(i)+"' does 
not exist.");
+                                       }
+                                       else // ignore column
+                                               continue;
+                               }
+                               colList.add(ix);
                        }
-               
+                       
                        //ensure ascending order of column IDs
-                       Arrays.sort(colList);
+                       arr = colList.stream().mapToInt((i) -> i)
+                               .sorted().toArray();
                }
-               
-               return colList;
+               return arr;
        }
 
        public static int[] parseJsonObjectIDList(JSONObject spec, String[] 
colnames, String group) 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
index b4ce148..aa88027 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedConstructionTest.java
@@ -91,7 +91,7 @@ public class FederatedConstructionTest extends 
AutomatedTestBase {
        /*
         * like other federated functionality, SPARK execution mode is not yet 
working (waiting for better integration of
         * federated instruction building, like propagating information that 
object is federated)
-        * 
+        *
         * @Test public void federatedFrameConstructionSP() throws IOException {
         * federatedFrameConstruction(Types.ExecMode.SPARK); }
         */
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
new file mode 100644
index 0000000..df9def8
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.transform;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.io.FrameReader;
+import org.apache.sysds.runtime.io.FrameReaderFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TransformFederatedEncodeDecodeTest extends AutomatedTestBase {
+       private static final String TEST_NAME1 = 
"TransformFederatedEncodeDecode";
+       private static final String TEST_DIR = "functions/transform/";
+       private static final String TEST_CLASS_DIR = 
TEST_DIR+TransformFederatedEncodeDecodeTest.class.getSimpleName()+"/";
+
+       private static final String SPEC = "TransformEncodeDecodeSpec.json";
+
+       private static final int rows = 1234;
+       private static final int cols = 2;
+       private static final double sparsity1 = 0.9;
+       private static final double sparsity2 = 0.1;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"FO"}));
+       }
+
+       @Test
+       public void runTestCSVDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.CSV);
+       }
+
+       @Test
+       public void runTestCSVSparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.CSV);
+       }
+
+       @Test
+       public void runTestTextcellDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.TEXT);
+       }
+
+       @Test
+       public void runTestTextcellSparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.TEXT);
+       }
+
+       @Test
+       public void runTestBinaryDenseCP() {
+               runTransformEncodeDecodeTest(false, Types.FileFormat.BINARY);
+       }
+
+       @Test
+       public void runTestBinarySparseCP() {
+               runTransformEncodeDecodeTest(true, Types.FileFormat.BINARY);
+       }
+
+       private void runTransformEncodeDecodeTest(boolean sparse, 
Types.FileFormat format) {
+               ExecMode platformOld = rtplatform;
+               rtplatform = ExecMode.SINGLE_NODE;
+
+               Thread t1 = null, t2 = null, t3 = null, t4 = null;
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+
+                       int port1 = getRandomAvailablePort();
+                       t1 = startLocalFedWorker(port1);
+                       int port2 = getRandomAvailablePort();
+                       t2 = startLocalFedWorker(port2);
+                       int port3 = getRandomAvailablePort();
+                       t3 = startLocalFedWorker(port3);
+                       int port4 = getRandomAvailablePort();
+                       t4 = startLocalFedWorker(port4);
+
+                       // schema
+                       Types.ValueType[] schema = new Types.ValueType[cols / 
2];
+                       Arrays.fill(schema, Types.ValueType.FP64);
+                       // generate and write input data
+                       // A is the data that will be aggregated and not recoded
+                       double[][] A = TestUtils.round(getRandomMatrix(rows, 
cols / 2, 1, 15, sparse ? sparsity2 : sparsity1, 7));
+                       double[][] AUpper = Arrays.copyOf(A, rows / 2);
+                       double[][] ALower = Arrays.copyOfRange(A, rows / 2, 
rows);
+                       writeInputFrameWithMTD("AU", AUpper, false, schema, 
format);
+                       writeInputFrameWithMTD("AL", ALower, false, schema, 
format);
+
+                       // B will be recoded and will be the column that will 
be grouped by
+                       Arrays.fill(schema, Types.ValueType.STRING);
+                       // we set sparsity to 1.0 to ensure all the string 
labels exist
+                       double[][] B = TestUtils.round(getRandomMatrix(rows, 
cols / 2, 1, 15, 1.0, 8));
+                       double[][] BUpper = Arrays.copyOf(B, rows / 2);
+                       double[][] BLower = Arrays.copyOfRange(B, rows / 2, 
rows);
+                       writeInputFrameWithMTD("BU", BUpper, false, schema, 
format);
+                       writeInputFrameWithMTD("BL", BLower, false, schema, 
format);
+
+                       fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME1 
+ ".dml";
+
+                       programArgs = new String[] {"-nvargs",
+                               "in_AU=" + 
TestUtils.federatedAddress("localhost", port1, input("AU")),
+                               "in_AL=" + 
TestUtils.federatedAddress("localhost", port2, input("AL")),
+                               "in_BU=" + 
TestUtils.federatedAddress("localhost", port3, input("BU")),
+                               "in_BL=" + 
TestUtils.federatedAddress("localhost", port4, input("BL")), "rows=" + rows, 
"cols=" + cols,
+                               "spec_file=" + SCRIPT_DIR + TEST_DIR + SPEC, 
"out=" + output("FO"), "format=" + format.toString()};
+
+                       // run test
+                       runTest(true, false, null, -1);
+
+                       // compare matrices (values recoded to identical codes)
+                       FrameReader reader = 
FrameReaderFactory.createFrameReader(format);
+                       FrameBlock FO = reader.readFrameFromHDFS(output("FO"), 
15, 2);
+                       HashMap<String, Long> cFA = getCounts(A, B);
+                       Iterator<String[]> iterFO = FO.getStringRowIterator();
+                       while(iterFO.hasNext()) {
+                               String[] row = iterFO.next();
+                               Double expected = (double) cFA.get(row[1]);
+                               Double val = (row[0] != null) ? 
Double.parseDouble(row[0]) : 0;
+                               Assert.assertEquals("Output aggregates don't 
match: " + expected + " vs " + val, expected, val);
+                       }
+               }
+               catch(Exception ex) {
+                       ex.printStackTrace();
+                       Assert.fail(ex.getMessage());
+               }
+               finally {
+                       TestUtils.shutdownThread(t1);
+                       TestUtils.shutdownThread(t2);
+                       TestUtils.shutdownThread(t3);
+                       TestUtils.shutdownThread(t4);
+                       rtplatform = platformOld;
+               }
+       }
+
+       private static HashMap<String, Long> getCounts(double[][] countFrame, 
double[][] groupFrame) {
+               HashMap<String, Long> ret = new HashMap<>();
+               for(int i = 0; i < countFrame.length; i++) {
+                       String key = "Str" + groupFrame[i][0];
+                       Long tmp = ret.get(key);
+                       ret.put(key, (tmp != null) ? tmp + 1 : 1);
+               }
+               return ret;
+       }
+}
diff --git 
a/src/test/scripts/functions/transform/TransformFederatedEncodeDecode.dml 
b/src/test/scripts/functions/transform/TransformFederatedEncodeDecode.dml
new file mode 100644
index 0000000..1ff5446
--- /dev/null
+++ b/src/test/scripts/functions/transform/TransformFederatedEncodeDecode.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+F1 = federated(type="frame", addresses=list($in_AU, $in_AL, $in_BU, $in_BL), 
ranges=
+  list(list(0,0), list($rows / 2, $cols / 2), # AUpper range
+    list($rows / 2, 0), list($rows, $cols / 2), # ALower range
+    list(0, $cols / 2), list($rows / 2, $cols), # BUpper range
+    list($rows / 2, $cols / 2), list($rows, $cols))); # BLower range
+jspec = read($spec_file, data_type="scalar", value_type="string");
+
+[X, M] = transformencode(target=F1, spec=jspec);
+
+A = aggregate(target=X[,1], groups=X[,2], fn="count");
+Ag = cbind(A, seq(1,nrow(A)));
+
+F2 = transformdecode(target=Ag, spec=jspec, meta=M);
+
+write(F2, $out, format=$format);
+

Reply via email to