Hi there,I'm working with a small DSL project on top of beam sdk. In the DSL I 
will define some aggregations on top of PCollection typed in "Row", a data type 
I defined which contains data stored in "Object" and type stored in 
"TypeDescriptor", both are member variables of a "Row".I tried to 
leverage/re-use aggregation functions in SDK not re-invent wheels. However that 
seems quite challange to me. Especially when I try to create a 
ComposedCombineFn, because both "CombineFn" and "TupleTag" will be generated 
during expression parse/compile time, and I don't know how to pass type 
variable to CombineFns.compose() method. -- I clearly know the type 
information, as based on AST generated I will get the TypeDescriptor of return 
value of every "CombineFn", but just I don't know how to  manipulate that 
programmatically to make it work.
some code framents which I used to verify..(not full code, just something to 
demostrate the embracing situation I have), notice this even won't compile with 
error: no instance(s) of type variable(s) exist so that capture etc. at line "  
                      Long maxLatency = e.get(finalOutputTags.get(0));
"
SqlParsedStatement statement = SqlParser.parse(expressions);

SqlExecutableStatement exe = SqlParser.compile(statement, schema);

RowSchema schema = exe.getSchema();

List<TupleTag<?>> outputTags = new ArrayList<>();

for (CellMeta meta : schema.getCellsMeta()) {

    switch (DslTypes.getTypeName(meta.getType())) {
        case "boolean":
            outputTags.add(new TupleTag<Boolean>() {});

        case "bigint":
            outputTags.add(new TupleTag<Long>() {});

        case "double":
            outputTags.add(new TupleTag<Double>() {});

        case "timestamp":
            outputTags.add(new TupleTag<Timestamp>() {});

        case "varchar":
            outputTags.add(new TupleTag<String>() {});

        case "binary":
            outputTags.add(new TupleTag<byte[]>() {});

        default:
            outputTags.add(new TupleTag<Void>() {});

    }
}

final TupleTagList finalOutputTags = TupleTagList.of(outputTags);

SimpleFunction<Long, Long> identityFn =
    new SimpleFunction<Long, Long>() {
        @Override
        public Long apply(Long input) {
            return input;
        }
    };


//below code is a mock. in real case I will create CombineFn during expression 
compile time.
//and the fns number is uncertain, up to dsl definitions.
CombineFn fn1 = Max.ofLongs();
CombineFn fn2 = Max.ofLongs();

ComposedCombineFn composedCombineFn = CombineFns.compose().with(identityFn, 
fn1, finalOutputTags.get(0));
composedCombineFn = composedCombineFn.with(identityFn, fn2, 
finalOutputTags.get(1));

PCollection<CoCombineResult> maxAndMean = testPipeline
    .apply(Create.of(1L, 2L, 3L, 4L))
    .apply(
        Combine.globally(composedCombineFn));

PCollection<Long> finalResultCollection = maxAndMean
        .apply(ParDo.of(
                new DoFn<CoCombineResult, Long>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws 
Exception {
                        CoCombineResult e = c.element();
                        Long maxLatency = e.get(finalOutputTags.get(0));
                        Long meanLatency = e.get(finalOutputTags.get(1));
                        System.out.println (maxLatency + meanLatency);
                        c.output(maxLatency + meanLatency);
                    }
                }))
    ;

testPipeline.run();

Reply via email to