This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new ae2ad07cae [SYSTEMDS-3787] New rewrite for transformencode w/o
metadata output
ae2ad07cae is described below
commit ae2ad07caed2c54108794f618f81179839d136b3
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Oct 24 12:13:11 2024 +0200
[SYSTEMDS-3787] New rewrite for transformencode w/o metadata output
This patch add a simple statement block rewrite which checks if the
metadata frame output of transformencode is used at all, and if not
sets a flag on the transformencode operation to avoid allocating and
serializing this meta data. The rewrite applies in about half of all
existing 'org.apache.sysds.test.functions.transform' tests.
---
.../apache/sysds/hops/rewrite/ProgramRewriter.java | 1 +
.../rewrite/RewriteRemoveTransformEncodeMeta.java | 71 ++++++++++++++++++++++
...ltiReturnParameterizedBuiltinCPInstruction.java | 22 ++++---
...ltiReturnParameterizedBuiltinSPInstruction.java | 24 +++++---
4 files changed, 103 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java
b/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java
index 54524021b9..cd440a6bcf 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/ProgramRewriter.java
@@ -115,6 +115,7 @@ public class ProgramRewriter{
_sbRuleSet.add( new
RewriteMarkLoopVariablesUpdateInPlace() );
if( LineageCacheConfig.getCompAssRW() )
_sbRuleSet.add( new MarkForLineageReuse()
);
+ _sbRuleSet.add( new
RewriteRemoveTransformEncodeMeta() );
}
// DYNAMIC REWRITES (which do require size information)
diff --git
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteRemoveTransformEncodeMeta.java
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteRemoveTransformEncodeMeta.java
new file mode 100644
index 0000000000..70023fb9ca
--- /dev/null
+++
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteRemoveTransformEncodeMeta.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.hops.rewrite;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysds.hops.FunctionOp;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.LiteralOp;
+import org.apache.sysds.parser.StatementBlock;
+
+/**
+ * Rule: If transformencode procudes a meta data frame which is never
+ * used, flag transformencode to never allocate an serialize this frame.
+ */
+public class RewriteRemoveTransformEncodeMeta extends StatementBlockRewriteRule
+{
+ private final static String TF_OPCODE = "TRANSFORMENCODE";
+
+ @Override
+ public List<StatementBlock> rewriteStatementBlock(StatementBlock sb,
ProgramRewriteStatus state)
+ {
+ if( sb.getHops() == null || sb.getHops().isEmpty() )
+ return Arrays.asList(sb);
+
+ //Transformencode is a multi-return FunctionOp and always
appears as root
+ //of the DAG. We then check that the meta data object is never
used,
+ //that is, the meta data is not in the live-out variables of
the statementblock
+ Hop root = sb.getHops().get(0);
+ if( root instanceof FunctionOp
+ &&
TF_OPCODE.equals(((FunctionOp)root).getFunctionName()) )
+ {
+ FunctionOp func = (FunctionOp)root;
+ if(
!sb.liveOut().containsVariable(func.getOutputVariableNames()[1])
+ && func.getInput().size() == 2) { //not added
yet
+ func.getInput().add(new LiteralOp(false));
+ LOG.debug("Applied removeTransformEncodeMeta
(line "+ func.getBeginLine() +").");
+ }
+ }
+
+ return Arrays.asList(sb);
+ }
+
+ @Override
+ public List<StatementBlock> rewriteStatementBlocks(List<StatementBlock>
sbs, ProgramRewriteStatus sate) {
+ return sbs;
+ }
+
+ @Override
+ public boolean createsSplitDag() {
+ return false;
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index 1c50eeb6c7..f5ba63b0ef 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -39,11 +39,13 @@ import
org.apache.sysds.runtime.transform.encode.MultiColumnEncoder;
public class MultiReturnParameterizedBuiltinCPInstruction extends
ComputationCPInstruction {
protected final ArrayList<CPOperand> _outputs;
-
+ protected final boolean _metaReturn;
+
private MultiReturnParameterizedBuiltinCPInstruction(Operator op,
CPOperand input1, CPOperand input2,
- ArrayList<CPOperand> outputs, String opcode, String istr) {
+ boolean metaReturn, ArrayList<CPOperand> outputs, String
opcode, String istr) {
super(CPType.MultiReturnBuiltin, op, input1, input2,
outputs.get(0), opcode, istr);
_outputs = outputs;
+ _metaReturn = metaReturn;
}
public CPOperand getOutput(int i) {
@@ -67,9 +69,14 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
// one input and two outputs
CPOperand in1 = new CPOperand(parts[1]);
CPOperand in2 = new CPOperand(parts[2]);
- outputs.add(new CPOperand(parts[3], ValueType.FP64,
DataType.MATRIX));
- outputs.add(new CPOperand(parts[4], ValueType.STRING,
DataType.FRAME));
- return new
MultiReturnParameterizedBuiltinCPInstruction(null, in1, in2, outputs, opcode,
str);
+ int pos = 3;
+ boolean metaReturn = true;
+ if( parts.length == 7 ) //no need for meta data
+ metaReturn = new
CPOperand(parts[pos++]).getLiteral().getBooleanValue();
+ outputs.add(new CPOperand(parts[pos], ValueType.FP64,
DataType.MATRIX));
+ outputs.add(new CPOperand(parts[pos+1],
ValueType.STRING, DataType.FRAME));
+ return new MultiReturnParameterizedBuiltinCPInstruction(
+ null, in1, in2, metaReturn, outputs, opcode,
str);
}
else {
throw new DMLRuntimeException("Invalid opcode in
MultiReturnBuiltin instruction: " + opcode);
@@ -87,9 +94,10 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
// execute block transform encode
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec,
colnames, fin.getNumColumns(), null);
// TODO: Assign #threads in compiler and pass via the
instruction string
+ int k = OptimizerUtils.getTransformNumThreads();
MatrixBlock data = encoder.encode(fin,
OptimizerUtils.getTransformNumThreads()); // build and apply
- FrameBlock meta = encoder.getMetaData(new
FrameBlock(fin.getNumColumns(), ValueType.STRING),
- OptimizerUtils.getTransformNumThreads());
+ FrameBlock meta = !_metaReturn ? new FrameBlock() :
+ encoder.getMetaData(new FrameBlock(fin.getNumColumns(),
ValueType.STRING), k);
meta.setColumnNames(colnames);
// release input and outputs
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 8d2546a612..ebd1bf7b44 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -77,10 +77,12 @@ import scala.Tuple2;
public class MultiReturnParameterizedBuiltinSPInstruction extends
ComputationSPInstruction {
protected ArrayList<CPOperand> _outputs;
+ protected final boolean _metaReturn;
private MultiReturnParameterizedBuiltinSPInstruction(Operator op,
CPOperand input1, CPOperand input2,
- ArrayList<CPOperand> outputs, String opcode, String istr) {
+ boolean metaReturn, ArrayList<CPOperand> outputs, String
opcode, String istr) {
super(SPType.MultiReturnBuiltin, op, input1, input2,
outputs.get(0), opcode, istr);
+ _metaReturn = metaReturn;
_outputs = outputs;
}
@@ -93,14 +95,17 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
// one input and two outputs
CPOperand in1 = new CPOperand(parts[1]);
CPOperand in2 = new CPOperand(parts[2]);
- outputs.add(new CPOperand(parts[3], ValueType.FP64,
DataType.MATRIX));
- outputs.add(new CPOperand(parts[4], ValueType.STRING,
DataType.FRAME));
- return new
MultiReturnParameterizedBuiltinSPInstruction(null, in1, in2, outputs, opcode,
str);
+ int pos = 3;
+ boolean metaReturn = true;
+ if( parts.length == 6 ) //no need for meta data
+ metaReturn = new
CPOperand(parts[pos++]).getLiteral().getBooleanValue();
+ outputs.add(new CPOperand(parts[pos], ValueType.FP64,
DataType.MATRIX));
+ outputs.add(new CPOperand(parts[pos+1],
ValueType.STRING, DataType.FRAME));
+ return new
MultiReturnParameterizedBuiltinSPInstruction(null, in1, in2, metaReturn,
outputs, opcode, str);
}
else {
throw new DMLRuntimeException("Invalid opcode in
MultiReturnBuiltin instruction: " + opcode);
}
-
}
@Override
@@ -112,8 +117,8 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
// get input RDD and meta data
FrameObject fo = sec.getFrameObject(input1.getName());
FrameObject fometa =
sec.getFrameObject(_outputs.get(1).getName());
- JavaPairRDD<Long, FrameBlock> in = (JavaPairRDD<Long,
FrameBlock>) sec.getRDDHandleForFrameObject(fo,
- FileFormat.BINARY);
+ JavaPairRDD<Long, FrameBlock> in = (JavaPairRDD<Long,
FrameBlock>)
+ sec.getRDDHandleForFrameObject(fo,
FileFormat.BINARY);
String spec =
ec.getScalarInput(input2).getStringValue();
DataCharacteristics mcIn =
sec.getDataCharacteristics(input1.getName());
DataCharacteristics mcOut =
sec.getDataCharacteristics(output.getName());
@@ -163,7 +168,10 @@ public class MultiReturnParameterizedBuiltinSPInstruction
extends ComputationSPI
// set output and maintain lineage/output
characteristics
sec.setRDDHandleForVariable(_outputs.get(0).getName(),
out);
sec.addLineageRDD(_outputs.get(0).getName(),
input1.getName());
- sec.setFrameOutput(_outputs.get(1).getName(), meta);
+ if( _metaReturn )
+ sec.setFrameOutput(_outputs.get(1).getName(),
meta);
+ else
+ sec.setFrameOutput(_outputs.get(1).getName(),
new FrameBlock());
}
catch(IOException ex) {
throw new RuntimeException(ex);