Hi there,I'm working with a small DSL project on top of beam sdk. In the DSL I
will define some aggregations on top of PCollection typed in "Row", a data type
I defined which contains data stored in "Object" and type stored in
"TypeDescriptor", both are member variables of a "Row".I tried to
leverage/re-use aggregation functions in SDK not re-invent wheels. However that
seems quite challange to me. Especially when I try to create a
ComposedCombineFn, because both "CombineFn" and "TupleTag" will be generated
during expression parse/compile time, and I don't know how to pass type
variable to CombineFns.compose() method. -- I clearly know the type
information, as based on AST generated I will get the TypeDescriptor of return
value of every "CombineFn", but just I don't know how to manipulate that
programmatically to make it work.
some code framents which I used to verify..(not full code, just something to
demostrate the embracing situation I have), notice this even won't compile with
error: no instance(s) of type variable(s) exist so that capture etc. at line "
Long maxLatency = e.get(finalOutputTags.get(0));
"
SqlParsedStatement statement = SqlParser.parse(expressions);
SqlExecutableStatement exe = SqlParser.compile(statement, schema);
RowSchema schema = exe.getSchema();
List<TupleTag<?>> outputTags = new ArrayList<>();
for (CellMeta meta : schema.getCellsMeta()) {
switch (DslTypes.getTypeName(meta.getType())) {
case "boolean":
outputTags.add(new TupleTag<Boolean>() {});
case "bigint":
outputTags.add(new TupleTag<Long>() {});
case "double":
outputTags.add(new TupleTag<Double>() {});
case "timestamp":
outputTags.add(new TupleTag<Timestamp>() {});
case "varchar":
outputTags.add(new TupleTag<String>() {});
case "binary":
outputTags.add(new TupleTag<byte[]>() {});
default:
outputTags.add(new TupleTag<Void>() {});
}
}
final TupleTagList finalOutputTags = TupleTagList.of(outputTags);
SimpleFunction<Long, Long> identityFn =
new SimpleFunction<Long, Long>() {
@Override
public Long apply(Long input) {
return input;
}
};
//below code is a mock. in real case I will create CombineFn during expression
compile time.
//and the fns number is uncertain, up to dsl definitions.
CombineFn fn1 = Max.ofLongs();
CombineFn fn2 = Max.ofLongs();
ComposedCombineFn composedCombineFn = CombineFns.compose().with(identityFn,
fn1, finalOutputTags.get(0));
composedCombineFn = composedCombineFn.with(identityFn, fn2,
finalOutputTags.get(1));
PCollection<CoCombineResult> maxAndMean = testPipeline
.apply(Create.of(1L, 2L, 3L, 4L))
.apply(
Combine.globally(composedCombineFn));
PCollection<Long> finalResultCollection = maxAndMean
.apply(ParDo.of(
new DoFn<CoCombineResult, Long>() {
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
CoCombineResult e = c.element();
Long maxLatency = e.get(finalOutputTags.get(0));
Long meanLatency = e.get(finalOutputTags.get(1));
System.out.println (maxLatency + meanLatency);
c.output(maxLatency + meanLatency);
}
}))
;
testPipeline.run();