Hi Michalis,
can you double check that you define the types of the Tuple2 output?
According to the error
"Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems
like the Tuple2 does not have specific types but they are java.lang.Objects.
Maybe that could be the problem.
Also to give some context, if you see in the FlinkMaterializedGroupByOperator
code there are some utility functions we use to map the Wayang UDFs to the
Flink (or Spark) UDFs. For example, the line:
final KeySelector<Type, KeyType> keyExtractor =
flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you
check the code of this class you could spot the problem?
Best
--
Zoi
Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis
Vargiamis <[email protected]> έγραψε:
Hello!
I've been working with the missing operator tests for Flink. I've
successfully done the SortOperator and the UnionAllOperator by seeing
the respective Spark operator tests and modifying RddChannel to
DataSetChannel.
I'm having trouble with the tests for other operators though, for
example the FlinkMaterializedGroupByOperator. I tried starting with
SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
DataSetChannel modifications as before, but I get the following error:
[ERROR]
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
Time elapsed: 1.911 s <<< ERROR!
org.apache.flink.api.common.InvalidProgramException: Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
of KeySelector class
org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
at
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
Digging into the operator code a bit more, the error happens at
dataSetInput.groupBy(keyExtractor);
Any ideas on what should be changed?
Here is a permalink to the respective spark test
[https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
Thank you,
Michalis Vargiamis