This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 5ff6274788 [minor] Compression SP Schema apply
5ff6274788 is described below
commit 5ff6274788bf4f5c9b42c2e43494c7f99451571c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed Feb 5 12:28:07 2025 +0100
[minor] Compression SP Schema apply
Closes #2215
---
.../runtime/instructions/SPInstructionParser.java | 2 ++
.../spark/BinaryFrameFrameSPInstruction.java | 25 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index 5c72b85436..5014c0ac30 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -39,6 +39,7 @@ import org.apache.sysds.lops.WeightedSquaredLossR;
import org.apache.sysds.lops.WeightedUnaryMM;
import org.apache.sysds.lops.WeightedUnaryMMR;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import
org.apache.sysds.runtime.instructions.spark.AggregateTernarySPInstruction;
import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction;
@@ -195,6 +196,7 @@ public class SPInstructionParser extends InstructionParser
String2SPInstructionType.put( "freplicate", SPType.Binary);
String2SPInstructionType.put( "mapdropInvalidLength",
SPType.Binary);
String2SPInstructionType.put( "valueSwap", SPType.Binary);
+ String2SPInstructionType.put( "applySchema" , SPType.Binary);
String2SPInstructionType.put( "_map", SPType.Ternary); // _map
refers to the operation map
// Relational Instruction Opcodes
String2SPInstructionType.put( "==" , SPType.Binary);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
index 6f6232e71a..dfad7a165e 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -59,6 +59,11 @@ public class BinaryFrameFrameSPInstruction extends
BinarySPInstruction {
// Attach result frame with FrameBlock associated with
output_name
sec.releaseFrameInput(input2.getName());
}
+ else if(getOpcode().equals("applySchema")){
+ Broadcast<FrameBlock> fb =
sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName()));
+ out = in1.mapValues(new applySchema(fb.getValue()));
+ sec.releaseFrameInput(input2.getName());
+ }
else {
JavaPairRDD<Long, FrameBlock> in2 =
sec.getFrameBinaryBlockRDDHandleForVariable(input2.getName());
// create output frame
@@ -70,7 +75,9 @@ public class BinaryFrameFrameSPInstruction extends
BinarySPInstruction {
//set output RDD and maintain dependencies
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
- if( !getOpcode().equals("dropInvalidType") &&
!getOpcode().equals("valueSwap"))
+ if(!getOpcode().equals("dropInvalidType") && //
+ !getOpcode().equals("valueSwap") && //
+ !getOpcode().equals("applySchema"))
sec.addLineageRDD(output.getName(), input2.getName());
}
@@ -116,4 +123,20 @@ public class BinaryFrameFrameSPInstruction extends
BinarySPInstruction {
return arg0.valueSwap(schema_frame);
}
}
+
+
+ private static class applySchema implements Function<FrameBlock,
FrameBlock>{
+ private static final long serialVersionUID = 58504021316402L;
+
+ private FrameBlock schema;
+
+ public applySchema(FrameBlock schema ) {
+ this.schema = schema;
+ }
+
+ @Override
+ public FrameBlock call(FrameBlock arg0) throws Exception {
+ return arg0.applySchema(schema);
+ }
+ }
}