carloea2 opened a new pull request, #5078:
URL: https://github.com/apache/texera/pull/5078

   Implemented a prototype **Compiled C++ UDF Operator** for Texera.
   
   The operator lets users write C++ directly inside the workflow UI, configure 
input/output columns, compile the code, and run it as a workflow operator. The 
MVP supports typed tuple/batch/table-style APIs, compiler errors, retained 
input columns, configurable compiler flags, timeout, and batching.
   
   To demonstrate why this is useful, I benchmarked the same deterministic 
CPU-heavy matrix multiplication workload across C++, Java UDF, and Python UDF. 
Each runtime receives its own independent source with the same seed/workload.
   
   ### Benchmark Results
   
   | Runtime | Avg ms / row | Min ms | Max ms | Runs | Speedup vs Python |
   |---|---:|---:|---:|---:|---:|
   | C++ | 0.122 | 0.103 | 0.477 | 80 | 132.7x |
   | Java | 1.073 | 0.388 | 5.673 | 80 | 15.1x |
   | Python | 16.220 | 9.235 | 34.053 | 80 | 1.0x |
   
   This shows the motivation clearly: Python is great for usability, but 
compiled C++ can provide major speedups for CPU-heavy UDFs while staying inside 
Texera’s visual workflow model.
   
   ### C++ UDF
   
   ```cpp
   #include <chrono>
   
   class MatrixMultiplyOperator : public texera::UDFOperator {
   public:
       texera::TupleOutput process_tuple(const texera::Tuple& tuple, int port) 
override {
           int trial = tuple.get("trial").as_int();
           int n = tuple.get("matrix_size").as_int();
           long long seed = tuple.get("seed").as_long();
   
           auto start = std::chrono::high_resolution_clock::now();
   
           double checksum = 0.0;
           for (int i = 0; i < n; i++) {
               for (int j = 0; j < n; j++) {
                   double cell = 0.0;
                   for (int k = 0; k < n; k++) {
                       double a = ((seed + trial * 97LL + i * 31LL + k * 17LL) 
% 1000LL) / 1000.0;
                       double b = ((seed + trial * 53LL + k * 13LL + j * 29LL) 
% 1000LL) / 1000.0;
                       cell += a * b;
                   }
                   checksum += cell;
               }
           }
   
           auto end = std::chrono::high_resolution_clock::now();
           double elapsed_ms = std::chrono::duration<double, std::milli>(end - 
start).count();
   
           return { texera::TupleLike{
               texera::Value::string_value("cpp"),
               texera::Value::double_value(checksum),
               texera::Value::double_value(elapsed_ms)
           }};
       }
   };
   
   using TexeraUDFOperator = MatrixMultiplyOperator;
   ```
   
   ### Java UDF
   
   ```java
   public TupleLike processTuple(Tuple tuple) {
       int trial = ((Number) tuple.getField("trial")).intValue();
       int n = ((Number) tuple.getField("matrix_size")).intValue();
       long seed = ((Number) tuple.getField("seed")).longValue();
   
       long start = System.nanoTime();
   
       double checksum = 0.0;
       for (int i = 0; i < n; i++) {
           for (int j = 0; j < n; j++) {
               double cell = 0.0;
               for (int k = 0; k < n; k++) {
                   double a = ((seed + trial * 97L + i * 31L + k * 17L) % 
1000L) / 1000.0;
                   double b = ((seed + trial * 53L + k * 13L + j * 29L) % 
1000L) / 1000.0;
                   cell += a * b;
               }
               checksum += cell;
           }
       }
   
       double elapsedMs = (System.nanoTime() - start) / 1_000_000.0;
   
       Object[] inputFields = tuple.getFields();
       Object[] outputFields = Arrays.copyOf(inputFields, inputFields.length + 
3);
       outputFields[inputFields.length] = "java";
       outputFields[inputFields.length + 1] = checksum;
       outputFields[inputFields.length + 2] = elapsedMs;
   
       return TupleLike$.MODULE$.apply(Arrays.asList(outputFields));
   }
   ```
   
   ### Python UDF
   
   ```python
   from pytexera import *
   import time
   
   class ProcessTupleOperator(UDFOperatorV2):
   
       @overrides
       def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
           trial = int(tuple_["trial"])
           n = int(tuple_["matrix_size"])
           seed = int(tuple_["seed"])
   
           start = time.perf_counter()
   
           checksum = 0.0
           for i in range(n):
               for j in range(n):
                   cell = 0.0
                   for k in range(n):
                       a = ((seed + trial * 97 + i * 31 + k * 17) % 1000) / 
1000.0
                       b = ((seed + trial * 53 + k * 13 + j * 29) % 1000) / 
1000.0
                       cell += a * b
                   checksum += cell
   
           elapsed_ms = (time.perf_counter() - start) * 1000.0
   
           output = tuple_.as_dict()
           output["runtime"] = "python"
           output["checksum"] = checksum
           output["elapsed_ms"] = elapsed_ms
           yield output
   ```
   
   
https://github.com/user-attachments/assets/10fc6eee-8742-403e-9b11-11221edb1b9b
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to